Changeset - 9f8f7a65f90d
[Not reviewed]
0 7 0
Christopher Esterhuyse - 5 years ago 2020-09-23 16:56:44
christopher.esterhuyse@gmail.com
simplified setup procedure's reconnection business. got rid of the finicky waker and waker token, instead relying on simple arithmetic for the timeout. more doc comments in setup and endpoint modules
7 files changed with 313 insertions and 192 deletions:
0 comments (0 inline, 0 general)
src/common.rs
Show inline comments
 
@@ -91,26 +91,26 @@ pub(crate) enum NonsyncBlocker {
 
    ComponentExit,
 
    SyncBlockStart,
 
}
 
#[derive(Debug, Clone)]
 
pub(crate) enum SyncBlocker {
 
    Inconsistent,
 
    SyncBlockEnd,
 
    CouldntReadMsg(PortId),
 
    CouldntCheckFiring(PortId),
 
    PutMsg(PortId, Payload),
 
    NondetChoice { n: u16 },
 
}
 
struct DenseDebugHex<'a>(pub &'a [u8]);
 
struct DebuggableIter<I: Iterator<Item = T> + Clone, T: Debug>(pub(crate) I);
 
pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]);
 
pub(crate) struct DebuggableIter<I: Iterator<Item = T> + Clone, T: Debug>(pub(crate) I);
 
///////////////////// IMPL /////////////////////
 
impl IdParts for Id {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        (self.connector_id, self.u32_suffix)
 
    }
 
}
 
impl IdParts for PortId {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        self.0.id_parts()
 
    }
 
}
 
impl IdParts for ComponentId {
src/macros.rs
Show inline comments
 
@@ -5,20 +5,22 @@ Change the definition of these macros to control the logging level statically
 
macro_rules! log {
 
    (@BENCH, $logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    (@MARK, $logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    (@ENDPT, $logger:expr, $($arg:tt)*) => {{
 
        // ignore
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
    ($logger:expr, $($arg:tt)*) => {{
 
        // if let Some(w) = $logger.line_writer() {
 
        //     let _ = writeln!(w, $($arg)*);
 
        // }
 
    }};
 
}
src/runtime/communication.rs
Show inline comments
 
@@ -143,25 +143,25 @@ impl Connector {
 
        } else {
 
            Err(WrongStateError)
 
        }
 
    }
 

	
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased: cu, phased } = self;
 
        let info = cu.ips.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        let info = cu.ips.port_info.map.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        if info.owner != cu.native_component_id {
 
            return Err(Poe::PortUnavailable);
 
        }
 
        if info.polarity != expect_polarity {
 
            return Err(Poe::WrongPolarity);
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Poe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
            }
 
@@ -367,55 +367,55 @@ impl Connector {
 
            // compute the solution predicate to associate with this branch.
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // all firing ports have SpecVal::FIRING
 
                let firing_iter = to_get.iter().chain(to_put.keys()).copied();
 
                log!(
 
                    cu.logger(),
 
                    "New native with firing ports {:?}",
 
                    firing_iter.clone().collect::<Vec<_>>()
 
                );
 
                let firing_ports: HashSet<PortId> = firing_iter.clone().collect();
 
                for port in firing_iter {
 
                    let var = cu.ips.spec_var_for(port);
 
                    let var = cu.ips.port_info.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // all silent ports have SpecVal::SILENT
 
                for port in cu.ips.ports_owned_by(cu.native_component_id) {
 
                for port in cu.ips.port_info.ports_owned_by(cu.native_component_id) {
 
                    if firing_ports.contains(port) {
 
                        // this one is FIRING
 
                        continue;
 
                    }
 
                    let var = cu.ips.spec_var_for(*port);
 
                    let var = cu.ips.port_info.spec_var_for(*port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        log!(&mut *cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_spec_var, branch_spec_val)
 
            };
 
            log!(cu.logger(), "Native branch index={:?} has consistent {:?}", index, &predicate);
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(
 
                    cu.logger(),
 
                    "Native branch {} sending msg {:?} with putter {:?}",
 
                    index,
 
                    &msg,
 
                    putter
 
                );
 
                // sanity check
 
                assert_eq!(Putter, cu.ips.port_info.get(&putter).unwrap().polarity);
 
                assert_eq!(Putter, cu.ips.port_info.map.get(&putter).unwrap().polarity);
 
                rctx.putter_push(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
                // empty to_get set => already corresponds with a component solution
 
                log!(
 
                    cu.logger(),
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
@@ -571,25 +571,25 @@ impl Connector {
 
            }
 
        }
 
        log!(cu.logger(), "All proto components are blocked");
 
        // ...invariant established!
 

	
 
        log!(cu.logger(), "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // handle all buffered messages, sending them through endpoints / feeding them to components
 
            log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_pop() {
 
                log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg);
 
                let getter_info = rctx.ips.port_info.get(&getter).unwrap();
 
                let getter_info = rctx.ips.port_info.map.get(&getter).unwrap();
 
                let cid = getter_info.owner; // the id of the component owning `getter` port
 
                assert_eq!(Getter, getter_info.polarity); // sanity check
 
                log!(
 
                    cu.logger(),
 
                    "Routing msg {:?} to {:?} via {:?}",
 
                    &send_payload_msg,
 
                    getter,
 
                    &getter_info.route
 
                );
 
                match getter_info.route {
 
                    Route::UdpEndpoint { index } => {
 
                        // this is a message sent over the network through a UDP endpoint
 
@@ -823,33 +823,33 @@ impl BranchingNative {
 
    // May result in discovering new component solutions,
 
    // or fork speculative branches if the message's predicate
 
    // is MORE SPECIFIC than the branches of the native
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        bn_temp: MapTempGuard<'_, Predicate, NativeBranch>,
 
    ) {
 
        log!(cu.logger(), "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert_eq!(Getter, rctx.ips.port_info.get(&getter).unwrap().polarity);
 
        assert_eq!(Getter, rctx.ips.port_info.map.get(&getter).unwrap().polarity);
 
        let mut draining = bn_temp;
 
        let finished = &mut self.branches;
 
        std::mem::swap(draining.0, finished);
 
        // Visit all native's branches, and feed those whose current predicates are
 
        // consistent with that of the received message.
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            let var = rctx.ips.spec_var_for(getter);
 
            let var = rctx.ips.port_info.spec_var_for(getter);
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch,
 
                // because the resulting branch would have an inconsistent predicate.
 
                // the existing branch asserts the getter port is SILENT
 
                log!(
 
                    cu.logger(),
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                Self::insert_branch_merging(finished, predicate, branch);
 
                continue;
 
            }
 
@@ -1037,59 +1037,59 @@ impl BranchingProtoComponent {
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency
 
                B::CouldntReadMsg(port) => {
 
                    // sanity check: `CouldntReadMsg` returned IFF the message is unavailable
 
                    assert!(!branch.inner.inbox.contains_key(&port)); 
 
                    // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle.
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned
 
                    let var = rctx.ips.spec_var_for(port);
 
                    let var = rctx.ips.port_info.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // speculate on the two possible values of `var`. Schedule both branches to be rerun.
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check: The given port indeed has `Putter` polarity
 
                    assert_eq!(Putter, rctx.ips.port_info.get(&putter).unwrap().polarity);
 
                    assert_eq!(Putter, rctx.ips.port_info.map.get(&putter).unwrap().polarity);
 
                    // assign FIRING to this port's associated firing variable
 
                    let var = rctx.ips.spec_var_for(putter);
 
                    let var = rctx.ips.port_info.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        // Discard the branch, as it clearly has contradictory requirements for this value.
 
                        log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!",
 
                            proto_component_id, putter, var);
 
                        drop((predicate, branch));
 
                    } else {
 
                        // Note that this port has put this round,
 
                        // and assert that this isn't its 2nd time putting this round (otheriwse PDL programming error)
 
                        assert!(branch.inner.did_put_or_get.insert(putter));
 
                        log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})",
 
                            proto_component_id, &predicate, &payload, putter, var);
 
                        // Send the given payload (by buffering it).
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.putter_push(cu, putter, msg);
 
                        // Branch can still make progress. Schedule to be rerun
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
                B::SyncBlockEnd => {
 
                    // This branch reached the end of it's synchronous block
 
                    // assign all variables of owned ports that DIDN'T fire to SILENT
 
                    for port in rctx.ips.ports_owned_by(proto_component_id) {
 
                        let var = rctx.ips.spec_var_for(*port);
 
                    for port in rctx.ips.port_info.ports_owned_by(proto_component_id) {
 
                        let var = rctx.ips.port_info.spec_var_for(*port);
 
                        let actually_exchanged = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let speculated_to_fire = val == SpecVal::FIRING;
 
                        if actually_exchanged != speculated_to_fire {
 
                            log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} actually_exchanged={}, speculated_to_fire={}",
 
                                port, var, val, actually_exchanged, speculated_to_fire);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
                        }
 
                    }
 
                    // submit solution for this component
 
@@ -1186,25 +1186,25 @@ impl BranchingProtoComponent {
 
                    Self::insert_branch_merging(&mut blocked, predicate, branch);
 
                    Self::insert_branch_merging(&mut unblocked, predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut(); // peel off ONE temp storage map
 
        let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0);
 
        BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
        // swap the blocked branches back
 
        std::mem::swap(blocked.0, &mut self.branches);
 
        log!(cu.logger(), "component settles down with branches: {:?}", branches.keys());
 
        log!(cu.logger(), "component settles down with branches: {:?}", self.branches.keys());
 
        Ok(())
 
    }
 

	
 
    // Insert a new speculate branch into the given storage,
 
    // MERGING it with an existing branch if their predicate keys clash.
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, ProtoComponentBranch>,
 
        predicate: Predicate,
 
        mut branch: ProtoComponentBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
@@ -1353,83 +1353,83 @@ impl SolutionStorage {
 
        }
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new component and changing the given port's ownership to that
 
    // of the new component.
 
    pub(crate) fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // Sanity check! The moved ports are owned by this component to begin with
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.ips.port_info.get(port).unwrap().owner
 
                self.ips.port_info.map.get(port).unwrap().owner
 
            );
 
        }
 
        // Create the new component, and schedule it to be run
 
        let new_cid = self.ips.id_manager.new_component_id();
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component {:?} with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            new_cid,
 
            &state,
 
            &moved_ports
 
        );
 
        self.unrun_components.push((new_cid, state));
 
        // Update the ownership of the moved ports
 
        for port in moved_ports.iter() {
 
            self.ips.port_info.get_mut(port).unwrap().owner = new_cid;
 
            self.ips.port_info.map.get_mut(port).unwrap().owner = new_cid;
 
        }
 
    }
 

	
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new port-pair connected by an memory channel
 
    pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.ips.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
 
        self.ips.port_info.insert(
 
        self.ips.port_info.map.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                polarity: Putter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.ips.port_info.insert(
 
        self.ips.port_info.map.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                polarity: Getter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    // The component calls the runtime back, inspecting whether it's associated
 
    // preidcate has already determined a (speculative) value for the given port's firing variable.
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.ips.spec_var_for(port);
 
        let var = self.rctx.ips.port_info.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 

	
 
    // The component calls the runtime back, trying to inspect a port's message
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        let maybe_msg = self.branch_inner.inbox.get(&port);
 
        if maybe_msg.is_some() {
 
            // Make a note that this component has received
 
            // this port's message 1+ times this round
 
            self.branch_inner.did_put_or_get.insert(port);
 
        }
 
        maybe_msg
src/runtime/endpoints.rs
Show inline comments
 
use super::*;
 

	
 
// A wrapper for some Read type, delegating read calls
 
// to the contained Read structure, but snooping on
 
// the number of bytes it reads, to be inspected later.
 
struct MonitoredReader<R: Read> {
 
    bytes: usize,
 
    r: R,
 
}
 

	
 
enum PollAndPopulateError {
 
    PollFailed,
 
    Timeout,
 
}
 

	
 
struct TryRecvAnyNetError {
 
    error: NetEndpointError,
 
    index: usize,
 
}
 
/////////////////////
 
impl NetEndpoint {
 
    // Returns the bincode configuration the NetEndpoint uses pervasively
 
    // for configuration on ser/de operations.
 
    fn bincode_opts() -> impl bincode::config::Options {
 
        // uses variable-length encoding everywhere; great!
 
        bincode::config::DefaultOptions::default()
 
    }
 

	
 
    // Attempt to return some deserializable T-type from
 
    // the inbox or network stream
 
    pub(super) fn try_recv<T: serde::de::DeserializeOwned>(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<T>, NetEndpointError> {
 
        use NetEndpointError as Nee;
 
        // populate inbox as much as possible
 
        // populate inbox with bytes as much as possible (mio::TcpStream is nonblocking)
 
        let before_len = self.inbox.len();
 
        'read_loop: loop {
 
            let res = self.stream.read_to_end(&mut self.inbox);
 
            match res {
 
                Err(e) if err_would_block(&e) => break 'read_loop,
 
                Ok(0) => break 'read_loop,
 
                Ok(_) => (),
 
                Err(_e) => return Err(Nee::BrokenNetEndpoint),
 
            }
 
        }
 
        log!(
 
            @ENDPT,
 
            logger,
 
            "Inbox bytes [{:x?}| {:x?}]",
 
            DenseDebugHex(&self.inbox[..before_len]),
 
            DenseDebugHex(&self.inbox[before_len..]),
 
        );
 
        // Try deserialize from the inbox, monitoring how many bytes
 
        // the deserialiation process consumes. In the event of
 
        // success, this makes clear where the message ends
 
        let mut monitored = MonitoredReader::from(&self.inbox[..]);
 
        use bincode::config::Options;
 
        match Self::bincode_opts().deserialize_from(&mut monitored) {
 
            Ok(msg) => {
 
                let msg_size = monitored.bytes_read();
 
                self.inbox.drain(0..(msg_size.try_into().unwrap()));
 
                // inbox[..msg_size] was deserialized into one message!
 
                self.inbox.drain(..msg_size);
 
                log!(
 
                    @ENDPT,
 
                    logger,
 
                    "Yielding msg. Inbox len {}-{}=={}: [{:?}]",
 
                    self.inbox.len() + msg_size,
 
                    msg_size,
 
                    self.inbox.len(),
 
                    DenseDebugHex(&self.inbox[..]),
 
                );
 
                Ok(Some(msg))
 
            }
 
            Err(e) => match *e {
 
                bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => {
 
                    // Contents of inbox insufficient for deserializing a message
 
                    Ok(None)
 
                }
 
                _ => Err(Nee::MalformedMessage),
 
            },
 
        }
 
    }
 

	
 
    // Send the given serializable type into the stream
 
    pub(super) fn send<T: serde::ser::Serialize>(
 
        &mut self,
 
        msg: &T,
 
    ) -> Result<(), NetEndpointError> {
 
        use bincode::config::Options;
 
        use NetEndpointError as Nee;
 
        Self::bincode_opts()
 
            .serialize_into(&mut self.stream, msg)
 
            .map_err(|_| Nee::BrokenNetEndpoint)?;
 
        let _ = self.stream.flush();
 
        Ok(())
 
    }
 
}
 

	
 
impl EndpointManager {
 
    pub(super) fn index_iter(&self) -> Range<usize> {
 
        0..self.num_net_endpoints()
 
    }
 
    pub(super) fn num_net_endpoints(&self) -> usize {
 
        self.net_endpoint_store.endpoint_exts.len()
 
    }
 

	
 
    // Setup-phase particular send procedure.
 
    // Used pervasively, allows for some brevity with the ? operator.
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|err| {
 
            ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 

	
 
    // Communication-phase particular send procedure.
 
    // Used pervasively, allows for some brevity with the ? operator.
 
    pub(super) fn send_to_comms(
 
        &mut self,
 
        index: usize,
 
        msg: &Msg,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        use UnrecoverableSyncError as Use;
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index })
 
    }
 
    pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> {
 
        let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
        net_endpoint.send(msg).map_err(|err| {
 
            ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err)
 
        })
 
    }
 

	
 
    /// Receive the first message of any kind at all.
 
    /// Why not return SetupMsg? Because often this message will be forwarded to several others,
 
    /// and by returning a Msg, it can be serialized in-place (NetEndpoints allow the sending of Msg types!)
 
    pub(super) fn try_recv_any_setup(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(usize, Msg), ConnectError> {
 
        ///////////////////////////////////////////
 
        // helper function, mapping a TryRecvAnySetup type error
 
        // into a ConnectError
 
        fn map_trane(
 
            trane: TryRecvAnyNetError,
 
            net_endpoint_store: &EndpointStore<NetEndpointExt>,
 
        ) -> ConnectError {
 
            ConnectError::NetEndpointSetupError(
 
                net_endpoint_store.endpoint_exts[trane.index]
 
                    .net_endpoint
 
                    .stream
 
                    .local_addr()
 
                    .unwrap(), // stream must already be connected
 
                trane.error,
 
            )
 
        }
 
        ///////////////////////////////////////////
 
        // try yield undelayed net message
 
        if let Some(tup) = self.undelayed_messages.pop() {
 
            log!(@ENDPT, logger, "RECV undelayed_msg {:?}", &tup);
 
            return Ok(tup);
 
        }
 
        loop {
 
            // try recv from some polled undrained NET endpoint
 
            if let Some(tup) = self
 
                .try_recv_undrained_net(logger)
 
                .map_err(|trane| map_trane(trane, &self.net_endpoint_store))?
 
            {
 
                return Ok(tup);
 
@@ -147,168 +171,183 @@ impl EndpointManager {
 
    // buffers all future round messages,
 
    // drops all previous round messages,
 
    // enqueues all current round SendPayload messages using rctx.getter_push
 
    // returns the first comm_ctrl_msg encountered
 
    // only polls until SOME message is enqueued
 
    pub(super) fn try_recv_any_comms(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        round_index: usize,
 
    ) -> Result<CommRecvOk, UnrecoverableSyncError> {
 
        ///////////////////////////////////////////
 
        // adds scoped functionality for EndpointManager
 
        impl EndpointManager {
 
            // Given some Msg structure in a particular context,
 
            // return a control message for the current round
 
            // if its a payload message, buffer it instead
 
            fn handle_msg(
 
                &mut self,
 
                cu: &mut impl CuUndecided,
 
                rctx: &mut RoundCtx,
 
                net_index: usize,
 
                msg: Msg,
 
                round_index: usize,
 
                some_message_enqueued: &mut bool,
 
            ) -> Option<(usize, CommCtrlMsg)> {
 
                let comm_msg_contents = match msg {
 
                    Msg::SetupMsg(..) => return None,
 
                    Msg::SetupMsg(..) => return None, // discard setup messages
 
                    Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) {
 
                        Ordering::Equal => comm_msg.contents,
 
                        Ordering::Equal => comm_msg.contents, // ok, keep going
 
                        Ordering::Less => {
 
                            // discard this message
 
                            log!(
 
                                cu.logger(),
 
                                "We are in round {}, but msg is for round {}. Discard",
 
                                comm_msg.round_index,
 
                                round_index,
 
                            );
 
                            return None;
 
                        }
 
                        Ordering::Greater => {
 
                            // "delay" this message, enqueueing it for a future round
 
                            log!(
 
                                cu.logger(),
 
                                "We are in round {}, but msg is for round {}. Buffer",
 
                                comm_msg.round_index,
 
                                round_index,
 
                            );
 
                            self.delayed_messages.push((net_index, Msg::CommMsg(comm_msg)));
 
                            return None;
 
                        }
 
                    },
 
                };
 
                // inspect the contents of this contemporary message, sorting it
 
                match comm_msg_contents {
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => Some((net_index, comm_ctrl_msg)),
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => {
 
                        // yes! this is a CommCtrlMsg
 
                        Some((net_index, comm_ctrl_msg))
 
                    }
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        // Enqueue this payload message
 
                        // Still not a CommCtrlMsg, so return None
 
                        let getter =
 
                            self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming;
 
                        rctx.getter_push(getter, send_payload_msg);
 
                        *some_message_enqueued = true;
 
                        None
 
                    }
 
                }
 
            }
 
        }
 
        use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use};
 
        ///////////////////////////////////////////
 
        let mut some_message_enqueued = false;
 
        // try yield undelayed net message
 
        // pop undelayed messages, handling them. Return the first CommCtrlMsg popped
 
        while let Some((net_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_index, msg)) =
 
                self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued)
 
            {
 
                return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
            }
 
        }
 
        loop {
 
            // try receive a net message
 
            // drain endpoints of incoming messages (without blocking)
 
            // return first CommCtrlMsg received
 
            while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? {
 
                if let Some((net_index, msg)) = self.handle_msg(
 
                    cu,
 
                    rctx,
 
                    net_index,
 
                    msg,
 
                    round_index,
 
                    &mut some_message_enqueued,
 
                ) {
 
                    return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
                }
 
            }
 
            // try receive a udp message
 
            let recv_buffer = self.udp_in_buffer.as_mut_slice();
 
            while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() {
 
                let ee = &mut self.udp_endpoint_store.endpoint_exts[index];
 
                if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() {
 
                    // I received a payload!
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_this_round {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = rctx.ips.spec_var_for(ee.getter_for_incoming);
 
                        let port_spec_var = rctx.ips.port_info.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        rctx.getter_push(
 
                            ee.getter_for_incoming,
 
                            SendPayloadMsg { payload, predicate },
 
                        );
 
                        some_message_enqueued = true;
 
                        ee.received_this_round = true;
 
                    } else {
 
                        // lose the message!
 
                    }
 
                }
 
            }
 
            if some_message_enqueued {
 
                return Ok(CommRecvOk::NewPayloadMsgs);
 
            }
 
            // poll if time remains
 
            match self.poll_and_populate(cu.logger(), &rctx.deadline) {
 
                Ok(()) => {} // continue looping
 
                Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew),
 
                Err(Pape::PollFailed) => return Err(Use::PollFailed),
 
            }
 
        }
 
    }
 

	
 
    // Try receive some message from any net endpoint without blocking
 
    fn try_recv_undrained_net(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<(usize, Msg)>, TryRecvAnyNetError> {
 
        while let Some(index) = self.net_endpoint_store.polled_undrained.pop() {
 
            let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
            if let Some(msg) = net_endpoint
 
                .try_recv(logger)
 
                .map_err(|error| TryRecvAnyNetError { error, index })?
 
            {
 
                log!(@ENDPT, logger, "RECV polled_undrained {:?}", &msg);
 
                if !net_endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                }
 
                return Ok(Some((index, msg)));
 
            }
 
        }
 
        Ok(None)
 
    }
 

	
 
    // Poll the network, raising `polled_undrained` flags for endpoints
 
    // as they receive events.
 
    fn poll_and_populate(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(), PollAndPopulateError> {
 
        use PollAndPopulateError as Pape;
 
        // No message yet. Do we have enough time to poll?
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Pape::Timeout)?)
 
        } else {
 
            None
 
        };
 
        // Yes we do! Poll with remaining time as poll deadline
 
        self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?;
 
        for event in self.events.iter() {
 
            match TokenTarget::from(event.token()) {
 
                TokenTarget::Waker => {
 
                    // Can ignore. Residual event from endpoint manager setup procedure
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                    log!(
 
                        @ENDPT,
 
                        logger,
 
                        "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.net_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
                TokenTarget::UdpEndpoint { index } => {
 
@@ -318,46 +357,55 @@ impl EndpointManager {
 
                        logger,
 
                        "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}",
 
                        &event,
 
                        index,
 
                        self.udp_endpoint_store.polled_undrained.iter()
 
                    );
 
                }
 
            }
 
        }
 
        self.events.clear();
 
        Ok(())
 
    }
 

	
 
    // Move all delayed messages to undelayed, making it possible to yield them
 
    pub(super) fn undelay_all(&mut self) {
 
        if self.undelayed_messages.is_empty() {
 
            // fast path
 
            std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages);
 
            return;
 
        }
 
        // slow path
 
        self.undelayed_messages.extend(self.delayed_messages.drain(..));
 
    }
 

	
 
    // End the synchronous round for the udp endpoints given the round decision
 
    pub(super) fn udp_endpoints_round_end(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        decision: &Decision,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        // retain received_from_this_round for use in pseudo_socket_api::recv_from
 
        log!(
 
            logger,
 
            "Ending round for {} udp endpoints",
 
            self.udp_endpoint_store.endpoint_exts.len()
 
        );
 
        use UnrecoverableSyncError as Use;
 
        if let Decision::Success(solution_predicate) = decision {
 
            // Similar to a native component, we commit the branch of the component
 
            // consistent with the predicate decided upon, making its effects visible
 
            // to the world outside the connector's internals.
 
            // In this case, this takes the form of emptying the component's outbox buffer,
 
            // actually sending payloads 'on the wire' as UDP messages.
 
            for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() {
 
                'outgoing_loop: for (payload_predicate, payload) in ee.outgoing_payloads.drain() {
 
                    if payload_predicate.assigns_subset(solution_predicate) {
 
                        ee.sock.send(payload.as_slice()).map_err(|e| {
 
                            println!("{:?}", e);
 
                            Use::BrokenUdpEndpoint { index }
 
                        })?;
 
                        log!(
 
                            logger,
 
                            "Sent payload {:?} with pred {:?} through Udp endpoint {}",
 
                            &payload,
 
                            &payload_predicate,
src/runtime/logging.rs
Show inline comments
 
use super::*;
 

	
 
// Used in the loggers' format string
 
fn secs_since_unix_epoch() -> f64 {
 
    std::time::SystemTime::now()
 
        .duration_since(std::time::UNIX_EPOCH)
 
        .map(|dur| dur.as_secs_f64())
 
        .unwrap_or(0.)
 
}
 
impl FileLogger {
 
    pub fn new(connector_id: ConnectorId, file: std::fs::File) -> Self {
 
        Self(connector_id, file)
 
    }
 
}
 
impl VecLogger {
src/runtime/mod.rs
Show inline comments
 
@@ -138,25 +138,25 @@ enum SetupMsg {
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 

	
 
// A data structure encoding the state of a connector, passed around
 
// during the session optimization procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: HashMap<PortId, PortInfo>,
 
    port_info: PortInfoMap,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
}
 

	
 
// Newtype wrapper for an Arc<ProtocolDescription>,
 
// such that it can be (de)serialized for transmission over the network.
 
#[derive(Debug, Clone)]
 
struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 

	
 
// Control message particular to the communication phase.
 
// as such, it's annotated with a round_index
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
@@ -304,30 +304,37 @@ struct PortInfo {
 
    polarity: Polarity,
 
    route: Route,
 
}
 

	
 
// Similar to `PortInfo`, but designed for communication during the setup procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
    owner: ComponentId,
 
}
 

	
 
// Newtype around port info map, allowing the implementation of some
 
// useful methods
 
#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
 
struct PortInfoMap {
 
    map: HashMap<PortId, PortInfo>,
 
}
 

	
 
// A convenient substructure for containing port info and the ID manager.
 
// Houses the bulk of the connector's persistent state between rounds.
 
// It turns out several situations require access to both things.
 
#[derive(Debug, Clone)]
 
struct IdAndPortState {
 
    port_info: HashMap<PortId, PortInfo>,
 
    port_info: PortInfoMap,
 
    id_manager: IdManager,
 
}
 

	
 
// A component's setup-phase-specific data
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundEndedNative>, SyncError>,
 
}
 
@@ -425,25 +432,24 @@ struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 

	
 
// Parallels a mio::Token type, but more clearly communicates
 
// the way it identifies the evented structre it corresponds to.
 
// See runtime/setup for methods converting between TokenTarget and mio::Token
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 

	
 
// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened,
 
// such that it can know when to continue polling, and when to block.
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn err_would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
@@ -466,36 +472,33 @@ impl<T: std::cmp::Ord> VecSet<T> {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn pop(&mut self) -> Option<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl IdAndPortState {
 
impl PortInfoMap {
 
    fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator<Item = &PortId> {
 
        self.port_info
 
            .iter()
 
            .filter(move |(_, port_info)| port_info.owner == owner)
 
            .map(|(port, _)| port)
 
        self.map.iter().filter(move |(_, port_info)| port_info.owner == owner).map(|(port, _)| port)
 
    }
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        // Every port maps to a speculative variable
 
        // Two distinct ports map to the same variable
 
        // IFF they are two ends of the same logical channel.
 
        let info = self.port_info.get(&port).unwrap();
 
        let info = self.map.get(&port).unwrap();
 
        SpecVar(match info.polarity {
 
            Getter => port,
 
            Putter => info.peer.unwrap(),
 
        })
 
    }
 
}
 
impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
@@ -519,25 +522,25 @@ impl IdManager {
 
        SpecVarStream { connector_id: self.connector_id, port_suffix_stream }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_component_id(&mut self) -> ComponentId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() }
 
            .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!");
 
        log!(self.unphased.logger(), "Connector dropping. Goodbye!");
 
    }
 
}
 
// Given a slice of ports, return the first, if any, port is present repeatedly
 
fn duplicate_port(slice: &[PortId]) -> Option<PortId> {
 
    let mut vec = Vec::with_capacity(slice.len());
 
    for port in slice.iter() {
 
        match vec.binary_search(port) {
 
            Err(index) => vec.insert(index, *port),
 
            Ok(_) => return Some(*port),
 
        }
 
    }
 
    None
 
@@ -585,34 +588,34 @@ impl Connector {
 
    /// # Panics
 
    /// This function panics if the connector's (large) port id space is exhausted.
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let mut new_cid = || cu.ips.id_manager.new_port_id();
 
        // allocate two fresh port identifiers
 
        let [o, i] = [new_cid(), new_cid()];
 
        // store info for each:
 
        // - they are each others' peers
 
        // - they are owned by a local component with id `cid`
 
        // - polarity putter, getter respectively
 
        cu.ips.port_info.insert(
 
        cu.ips.port_info.map.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                owner: cu.native_component_id,
 
                polarity: Putter,
 
            },
 
        );
 
        cu.ips.port_info.insert(
 
        cu.ips.port_info.map.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                owner: cu.native_component_id,
 
                polarity: Getter,
 
            },
 
        );
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 

	
 
@@ -633,41 +636,41 @@ impl Connector {
 
    ) -> Result<(), AddComponentError> {
 
        // Check for error cases first before modifying `cu`
 
        use AddComponentError as Ace;
 
        let cu = &self.unphased;
 
        if let Some(port) = duplicate_port(ports) {
 
            return Err(Ace::DuplicatePort(port));
 
        }
 
        let expected_polarities = cu.proto_description.component_polarities(identifier)?;
 
        if expected_polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) {
 
            let info = cu.ips.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
            if info.polarity != expected_polarity {
 
                return Err(Ace::WrongPortPolarity { port, expected_polarity });
 
            }
 
        }
 
        // No errors! Time to modify `cu`
 
        // create a new component and identifier
 
        let cu = &mut self.unphased;
 
        let new_cid = cu.ips.id_manager.new_component_id();
 
        cu.proto_components
 
            .insert(new_cid, cu.proto_description.new_main_component(identifier, ports));
 
        // update the ownership of moved ports
 
        for port in ports.iter() {
 
            match cu.ips.port_info.get_mut(port) {
 
            match cu.ips.port_info.map.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
 
        }
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn singleton(k: SpecVar, v: SpecVal) -> Self {
 
        Self::default().inserted(k, v)
 
    }
 
@@ -778,25 +781,25 @@ impl RoundCtx {
 
    // remove an arbitrary buffered message, along with the ID of the getter who receives it
 
    fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.payload_inbox.pop()
 
    }
 

	
 
    // buffer a message along with the ID of the getter who receives it
 
    fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.payload_inbox.push((getter, msg));
 
    }
 

	
 
    // buffer a message along with the ID of the putter who sent it
 
    fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(getter) = self.ips.port_info.get(&putter).unwrap().peer {
 
        if let Some(getter) = self.ips.port_info.map.get(&putter).unwrap().peer {
 
            log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_push(getter, msg);
 
        } else {
 
            log!(cu.logger(), "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
 

	
 
impl<T: Debug + std::cmp::Ord> Debug for VecSet<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_set().entries(self.vec.iter()).finish()
src/runtime/setup.rs
Show inline comments
 
use crate::common::*;
 
use crate::runtime::*;
 

	
 
#[derive(Default)]
 
struct ExtraPortInfo {
 
    info: HashMap<PortId, PortInfo>,
 
    peers: HashMap<PortId, PortId>,
 
}
 

	
 
impl TokenTarget {
 
    // subdivides the domain of usize into
 
    // [NET_ENDPOINT][UDP_ENDPOINT  ]
 
    // ^0            ^usize::MAX/2   ^usize::MAX
 
    const HALFWAY_INDEX: usize = usize::MAX / 2;
 
    const MAX_INDEX: usize = usize::MAX;
 
    const WAKER_TOKEN: usize = Self::MAX_INDEX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if index == Self::WAKER_TOKEN {
 
            TokenTarget::Waker
 
        } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
        if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::Waker => Token(Self::WAKER_TOKEN),
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl Connector {
 
    /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing).
 
    /// The resulting connector will start in the setup phase, and cannot be used for communication until the
 
    /// `connect` procedure completes.
 
    /// # Safety
 
    /// The correctness of the system's underlying distributed algorithms requires that no two
 
    /// connectors have the same ID. If the user does not know the identifiers of other connectors in the
 
@@ -51,120 +56,129 @@ impl Connector {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                native_component_id,
 
                ips: IdAndPortState { id_manager, port_info: Default::default() },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 

	
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let udp_cid = cu.ips.id_manager.new_component_id();
 
                // allocates 4 new port identifiers, two for each logical channel,
 
                // one channel per direction (into and out of the component)
 
                let mut npid = || cu.ips.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 

	
 
                cu.ips.port_info.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.ips.port_info.insert(
 
                // allocate the native->udp_mediator channel's ports
 
                cu.ips.port_info.map.insert(
 
                    nout,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.ips.port_info.insert(
 
                cu.ips.port_info.map.insert(
 
                    uin,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Getter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.ips.port_info.insert(
 
                // allocate the udp_mediator->native channel's ports
 
                cu.ips.port_info.map.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.ips.port_info.map.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                // allocate the two ports owned by the UdpMediator component
 
                // Remember to setup this UdpEndpoint setup during `connect` later.
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 
                // Return the native's output, input port pair
 
                Ok([nout, nin])
 
            }
 
        }
 
    }
 

	
 
    /// Adds a "dangling" port to the connector in the setup phase,
 
    /// to be formed into channel during the connect procedure with the given
 
    /// transport layer information.
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                // allocate a single dangling port with a `None` peer (for now)
 
                let new_pid = cu.ips.id_manager.new_port_id();
 
                cu.ips.port_info.insert(
 
                cu.ips.port_info.map.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        peer: None,
 
                        owner: cu.native_component_id,
 
                        polarity,
 
                    },
 
                );
 
                log!(
 
                    cu.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    new_pid,
 
                    polarity,
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                // Remember to setup this NetEndpoint setup during `connect` later.
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    getter_for_incoming: new_pid,
 
                });
 
                Ok(new_pid)
 
            }
 
        }
 
    }
 

	
 
    /// Finalizes the connector's setup procedure and forms a distributed system with
 
    /// all other connectors reachable through network channels. This procedure represents
 
@@ -177,389 +191,410 @@ impl Connector {
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                let (mut endpoint_manager, mut extra_port_info) = setup_endpoints_and_pair_ports(
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.ips.port_info,
 
                    &cu.ips.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                    &cu.ips.port_info,
 
                    &endpoint_manager,
 
                );
 
                // leader election and tree construction
 
                // leader election and tree construction. Learn our role in the consensus tree,
 
                // from learning who are our children/parents (neighbors) in the consensus tree.
 
                let neighborhood = init_neighborhood(
 
                    cu.ips.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    &deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                // Put it all together with an initial round index of zero.
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                    round_result: Ok(None), // no previous round yet
 
                };
 
                if cfg!(feature = "session_optimization") {
 
                    // Perform the session optimization procedure, which may modify the
 
                    // internals of the connector, rerouting ports, moving around connectors etc.
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                // Connect procedure successful! Commit changes by...
 
                // ... commiting new port info for ConnectorUnphased
 
                for (port, info) in extra_port_info.info.drain() {
 
                    cu.ips.port_info.map.insert(port, info);
 
                }
 
                for (port, peer) in extra_port_info.peers.drain() {
 
                    cu.ips.port_info.map.get_mut(&port).unwrap().peer = Some(peer);
 
                }
 
                // ... replacing the connector's phase to "communication"
 
                *phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 

	
 
// Given a set of net_ and udp_ endpoints to setup,
 
// port information to flesh out (by discovering peers through channels)
 
// and a deadline in which to do it,
 
// try to return:
 
// - An EndpointManager, containing all the set up endpoints
 
// - new information about ports acquired through the newly-created channels
 
fn setup_endpoints_and_pair_ports(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &mut HashMap<PortId, PortInfo>,
 
    port_info: &PortInfoMap,
 
    deadline: &Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 
) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> {
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 
    struct WakerState {
 
        continue_signal: AtomicBool,
 
        waker: mio::Waker,
 
    }
 
    impl WakerState {
 
        fn waker_loop(&self) {
 
            while self.continue_signal.load(SeqCst) {
 
                std::thread::sleep(WAKER_PERIOD);
 
                let _ = self.waker.wake();
 
            }
 
        }
 
        fn waker_stop(&self) {
 
            self.continue_signal.store(false, SeqCst);
 
            // TODO keep waker registered?
 
        }
 
    }
 
    struct Todo {
 
    const RETRY_PERIOD: Duration = Duration::from_millis(200);
 

	
 
    // The structure shared between this ("setup") thread and that of the waker.
 
    // The waker thread periodically sends signals.
 
    // struct WakerState {
 
    //     continue_signal: AtomicBool,
 
    //     waker: mio::Waker,
 
    // }
 
    // impl WakerState {
 
    //     // The waker thread runs this UNTIL the continue signal is set to false
 
    //     fn waker_loop(&self) {
 
    //         while self.continue_signal.load(SeqCst) {
 
    //             std::thread::sleep(WAKER_PERIOD);
 
    //             let _ = self.waker.wake();
 
    //         }
 
    //     }
 
    //     // The setup thread thread runs this to set the continue signal to false.
 
    //     fn waker_stop(&self) {
 
    //         self.continue_signal.store(false, SeqCst);
 
    //     }
 
    // }
 

	
 
    // The data for a net endpoint's setup in progress
 
    struct NetTodo {
 
        // becomes completed once sent_local_port && recv_peer_port.is_some()
 
        // we send local port if we haven't already and we receive a writable event
 
        // we recv peer port if we haven't already and we receive a readbale event
 
        todo_endpoint: TodoEndpoint,
 
        todo_endpoint: NetTodoEndpoint,
 
        endpoint_setup: NetEndpointSetup,
 
        sent_local_port: bool,          // true <-> I've sent my local port
 
        recv_peer_port: Option<PortId>, // Some(..) <-> I've received my peer's port
 
    }
 

	
 
    // The data for a udp endpoint's setup in progress
 
    struct UdpTodo {
 
        // becomes completed once we receive our first writable event
 
        getter_for_incoming: PortId,
 
        sock: UdpSocket,
 
    }
 
    enum TodoEndpoint {
 
        Accepting(TcpListener),
 
        NetEndpoint(NetEndpoint),
 

	
 
    // Substructure of `NetTodo`, which represents the endpoint itself
 
    enum NetTodoEndpoint {
 
        Accepting(TcpListener),       // awaiting it's peer initiating the connection
 
        PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel
 
    }
 

	
 
    ////////////////////////////////////////////
 

	
 
    // 1. Start to construct EndpointManager
 
    let mut waker_state: Option<Arc<WakerState>> = None;
 
    // Start to construct our return values
 
    // let mut waker_state: Option<Arc<WakerState>> = None;
 
    let mut extra_port_info = ExtraPortInfo::default();
 
    let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?;
 
    let mut events =
 
        Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4);
 
    let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()];
 
    let mut delayed_messages = vec![];
 
    let mut last_retry_at = Instant::now();
 

	
 
    // 2. Create net/udp TODOs, each already registered with poll
 
    // Create net/udp todo structures, each already registered with poll
 
    let mut net_todos = net_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let token = TokenTarget::NetEndpoint { index }.into();
 
            log!(logger, "Net endpoint {} beginning setup with {:?}", index, &endpoint_setup);
 
            let todo_endpoint = if let EndpointPolarity::Active = endpoint_setup.endpoint_polarity {
 
                let mut stream = TcpStream::connect(endpoint_setup.sock_addr)
 
                    .expect("mio::TcpStream connect should not fail!");
 
                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] })
 
                NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] })
 
            } else {
 
                let mut listener = TcpListener::bind(endpoint_setup.sock_addr)
 
                    .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?;
 
                poll.registry().register(&mut listener, token, BOTH).unwrap();
 
                TodoEndpoint::Accepting(listener)
 
                NetTodoEndpoint::Accepting(listener)
 
            };
 
            Ok(Todo {
 
            Ok(NetTodo {
 
                todo_endpoint,
 
                sent_local_port: false,
 
                recv_peer_port: None,
 
                endpoint_setup: endpoint_setup.clone(),
 
            })
 
        })
 
        .collect::<Result<Vec<Todo>, ConnectError>>()?;
 
        .collect::<Result<Vec<NetTodo>, ConnectError>>()?;
 
    let udp_todos = udp_endpoint_setups
 
        .iter()
 
        .enumerate()
 
        .map(|(index, endpoint_setup)| {
 
            let mut sock = UdpSocket::bind(endpoint_setup.local_addr)
 
                .map_err(|_| Ce::BindFailed(endpoint_setup.local_addr))?;
 
            sock.connect(endpoint_setup.peer_addr)
 
                .map_err(|_| Ce::UdpConnectFailed(endpoint_setup.peer_addr))?;
 
            poll.registry()
 
                .register(&mut sock, TokenTarget::UdpEndpoint { index }.into(), Interest::WRITABLE)
 
                .unwrap();
 
            Ok(UdpTodo { sock, getter_for_incoming: endpoint_setup.getter_for_incoming })
 
        })
 
        .collect::<Result<Vec<UdpTodo>, ConnectError>>()?;
 

	
 
    // Initially, (1) no net connections have failed, and (2) all udp and net endpoint setups are incomplete
 
    let mut net_connect_retry_later: HashSet<usize> = Default::default();
 
    // Initially no net connections have failed, and all udp and net endpoint setups are incomplete
 
    let mut net_connect_to_retry: HashSet<usize> = Default::default();
 
    let mut setup_incomplete: HashSet<TokenTarget> = {
 
        let net_todo_targets_iter =
 
            (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index });
 
        let udp_todo_targets_iter =
 
            (0..udp_todos.len()).map(|index| TokenTarget::UdpEndpoint { index });
 
        net_todo_targets_iter.chain(udp_todo_targets_iter).collect()
 
    };
 
    // progress by reacting to poll events. continue until every endpoint is set up
 
    while !setup_incomplete.is_empty() {
 
        // recompute the time left to poll for progress
 
        let remaining = if let Some(deadline) = deadline {
 
            Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?)
 
            deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?.min(RETRY_PERIOD)
 
        } else {
 
            None
 
            RETRY_PERIOD
 
        };
 
        poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?;
 
        for event in events.iter() {
 
            let token = event.token();
 
            let token_target = TokenTarget::from(token);
 
            match token_target {
 
                TokenTarget::Waker => {
 
                    log!(
 
                        logger,
 
                        "Notification from waker. connect_failed is {:?}",
 
                        net_connect_retry_later.iter()
 
                    );
 
                    assert!(waker_state.is_some());
 
                    for net_index in net_connect_retry_later.drain() {
 
        // block until either
 
        // (a) `events` has been populated with 1+ elements
 
        // (b) timeout elapses, or
 
        // (c) RETRY_PERIOD elapses
 
        poll.poll(&mut events, Some(remaining)).map_err(|_| Ce::PollFailed)?;
 
        if last_retry_at.elapsed() > RETRY_PERIOD {
 
            // Retry all net connections and reset `last_retry_at`
 
            last_retry_at = Instant::now();
 
            for net_index in net_connect_to_retry.drain() {
 
                // Restart connect procedure for this net endpoint
 
                let net_todo = &mut net_todos[net_index];
 
                log!(
 
                    logger,
 
                    "Restarting connection with endpoint {:?} {:?}",
 
                    net_index,
 
                    net_todo.endpoint_setup.sock_addr
 
                );
 
                match &mut net_todo.todo_endpoint {
 
                            TodoEndpoint::NetEndpoint(endpoint) => {
 
                                let mut new_stream =
 
                                    TcpStream::connect(net_todo.endpoint_setup.sock_addr)
 
                    NetTodoEndpoint::PeerInfoRecving(endpoint) => {
 
                        let mut new_stream = TcpStream::connect(net_todo.endpoint_setup.sock_addr)
 
                            .expect("mio::TcpStream connect should not fail!");
 
                        std::mem::swap(&mut endpoint.stream, &mut new_stream);
 
                        let token = TokenTarget::NetEndpoint { index: net_index }.into();
 
                                poll.registry()
 
                                    .register(&mut endpoint.stream, token, BOTH)
 
                                    .unwrap();
 
                        poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap();
 
                    }
 
                    _ => unreachable!(),
 
                }
 
            }
 
        }
 
        for event in events.iter() {
 
            let token = event.token();
 
            // figure out which endpoint the event belonged to
 
            let token_target = TokenTarget::from(token);
 
            match token_target {
 
                TokenTarget::UdpEndpoint { index } => {
 
                    // UdpEndpoints are easy to complete.
 
                    // Their setup event just has to succeed without error
 
                    if !setup_incomplete.contains(&token_target) {
 
                        // spurious wakeup. this endpoint has already been set up!
 
                        continue;
 
                    }
 
                    let udp_todo: &UdpTodo = &udp_todos[index];
 
                    if event.is_error() {
 
                        return Err(Ce::BindFailed(udp_todo.sock.local_addr().unwrap()));
 
                    }
 
                    setup_incomplete.remove(&token_target);
 
                }
 
                TokenTarget::NetEndpoint { index } => {
 
                    // NetEndpoints are complex to complete,
 
                    // they must accept/connect to their peer,
 
                    // and then exchange port info successfully
 
                    let net_todo = &mut net_todos[index];
 
                    if let TodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint {
 
                        // FIRST try complete this connection
 
                    if let NetTodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint {
 
                        // Passive endpoint that will first try accept the peer's connection
 
                        match listener.accept() {
 
                            Err(e) if err_would_block(&e) => continue, // spurious wakeup
 
                            Err(_) => {
 
                                log!(logger, "accept() failure on index {}", index);
 
                                return Err(Ce::AcceptFailed(listener.local_addr().unwrap()));
 
                            }
 
                            Ok((mut stream, peer_addr)) => {
 
                                // successfully accepted the active peer
 
                                // reusing the token, but now for the stream and not the listener
 
                                poll.registry().deregister(listener).unwrap();
 
                                poll.registry().register(&mut stream, token, BOTH).unwrap();
 
                                log!(
 
                                    logger,
 
                                    "Endpoint[{}] accepted a connection from {:?}",
 
                                    index,
 
                                    peer_addr
 
                                );
 
                                let net_endpoint = NetEndpoint { stream, inbox: vec![] };
 
                                net_todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint);
 
                                net_todo.todo_endpoint =
 
                                    NetTodoEndpoint::PeerInfoRecving(net_endpoint);
 
                            }
 
                        }
 
                    }
 
                    if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut net_todo.todo_endpoint {
 
                    // OK now let's try and finish exchanging port info
 
                    if let NetTodoEndpoint::PeerInfoRecving(net_endpoint) =
 
                        &mut net_todo.todo_endpoint
 
                    {
 
                        if event.is_error() {
 
                            // event signals some error! :(
 
                            if net_todo.endpoint_setup.endpoint_polarity
 
                                == EndpointPolarity::Passive
 
                            {
 
                                // right now you cannot retry an acceptor. return failure
 
                                // breaking as the acceptor is currently unrecoverable
 
                                return Err(Ce::AcceptFailed(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                ));
 
                            }
 
                            // this actively-connecting endpoint failed to connect!
 
                            if net_connect_retry_later.insert(index) {
 
                                log!(
 
                                    logger,
 
                                    "Connection failed for {:?}. List is {:?}",
 
                                    index,
 
                                    net_connect_retry_later.iter()
 
                                );
 
                                poll.registry().deregister(&mut net_endpoint.stream).unwrap();
 
                            } else {
 
                                // spurious wakeup. already scheduled to retry connect later
 
                                continue;
 
                            }
 
                            if waker_state.is_none() {
 
                                log!(logger, "First connect failure. Starting waker thread");
 
                                let arc = Arc::new(WakerState {
 
                                    waker: mio::Waker::new(
 
                                        poll.registry(),
 
                                        TokenTarget::Waker.into(),
 
                                    )
 
                                    .unwrap(),
 
                                    continue_signal: true.into(),
 
                                });
 
                                let moved_arc = arc.clone();
 
                                waker_state = Some(arc);
 
                                std::thread::spawn(move || moved_arc.waker_loop());
 
                            }
 
                            // We will schedule it for a retry
 
                            net_connect_to_retry.insert(index);
 
                            continue;
 
                        }
 
                        // event wasn't ERROR
 
                        if net_connect_retry_later.contains(&index) {
 
                        if net_connect_to_retry.contains(&index) {
 
                            // spurious wakeup. already scheduled to retry connect later
 
                            continue;
 
                        }
 
                        if !setup_incomplete.contains(&token_target) {
 
                            // spurious wakeup. this endpoint has already been completed!
 
                            if event.is_readable() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            continue;
 
                        }
 
                        let local_info = port_info
 
                            .get_mut(&net_todo.endpoint_setup.getter_for_incoming)
 
                            .unwrap();
 
                            .map
 
                            .get(&net_todo.endpoint_setup.getter_for_incoming)
 
                            .expect("Net Setup's getter port info isn't known"); // unreachable
 
                        if event.is_writable() && !net_todo.sent_local_port {
 
                            // can write and didn't send setup msg yet? Do so!
 
                            let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo {
 
                                owner: local_info.owner,
 
                                polarity: local_info.polarity,
 
                                port: net_todo.endpoint_setup.getter_for_incoming,
 
                            }));
 
                            net_endpoint
 
                                .send(&msg)
 
                                .map_err(|e| {
 
                                    Ce::NetEndpointSetupError(
 
                                        net_endpoint.stream.local_addr().unwrap(),
 
                                        e,
 
                                    )
 
                                })
 
                                .unwrap();
 
                            log!(logger, "endpoint[{}] sent msg {:?}", index, &msg);
 
                            net_todo.sent_local_port = true;
 
                        }
 
                        if event.is_readable() && net_todo.recv_peer_port.is_none() {
 
                            // can read and didn't recv setup msg yet? Do so!
 
                            // can read and didn't finish recving setup msg yet? Do so!
 
                            let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| {
 
                                Ce::NetEndpointSetupError(
 
                                    net_endpoint.stream.local_addr().unwrap(),
 
                                    e,
 
                                )
 
                            })?;
 
                            if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() {
 
                                net_polled_undrained.insert(index);
 
                            }
 
                            match maybe_msg {
 
                                None => {} // msg deserialization incomplete
 
                                Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => {
 
                                    log!(
 
                                        logger,
 
                                        "endpoint[{}] got peer info {:?}",
 
                                        index,
 
                                        peer_info
 
                                    );
 
                                    if peer_info.polarity == local_info.polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            net_todo.endpoint_setup.getter_for_incoming,
 
                                        ));
 
                                    }
 
                                    net_todo.recv_peer_port = Some(peer_info.port);
 
                                    // 1. finally learned the peer of this port!
 
                                    local_info.peer = Some(peer_info.port);
 
                                    // 2. learned the info of this peer port
 
                                    port_info.entry(peer_info.port).or_insert(PortInfo {
 
                                    // finally learned the peer of this port!
 
                                    extra_port_info.peers.insert(
 
                                        net_todo.endpoint_setup.getter_for_incoming,
 
                                        peer_info.port,
 
                                    );
 
                                    // learned the info of this peer port
 
                                    if !port_info.map.contains_key(&peer_info.port) {
 
                                        let info = PortInfo {
 
                                            peer: Some(net_todo.endpoint_setup.getter_for_incoming),
 
                                            polarity: peer_info.polarity,
 
                                            owner: peer_info.owner,
 
                                            route: Route::NetEndpoint { index },
 
                                    });
 
                                        };
 
                                        extra_port_info.info.insert(peer_info.port, info);
 
                                    }
 
                                }
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
                                        logger,
 
                                        "delaying msg {:?} during channel setup phase",
 
                                        inappropriate_msg
 
                                    );
 
                                    delayed_messages.push((index, inappropriate_msg));
 
                                }
 
                            }
 
                        }
 
                        // is the setup for this net_endpoint now complete?
 
                        if net_todo.sent_local_port && net_todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&token_target);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    log!(logger, "Endpoint setup complete! Cleaning up and building structures");
 
    if let Some(ws) = waker_state.take() {
 
        ws.waker_stop();
 
    }
 
    let net_endpoint_exts = net_todos
 
        .into_iter()
 
        .enumerate()
 
        .map(|(index, Todo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
        .map(|(index, NetTodo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt {
 
            net_endpoint: match todo_endpoint {
 
                TodoEndpoint::NetEndpoint(mut net_endpoint) => {
 
                NetTodoEndpoint::PeerInfoRecving(mut net_endpoint) => {
 
                    let token = TokenTarget::NetEndpoint { index }.into();
 
                    poll.registry()
 
                        .reregister(&mut net_endpoint.stream, token, Interest::READABLE)
 
                        .unwrap();
 
                    net_endpoint
 
                }
 
                _ => unreachable!(),
 
            },
 
            getter_for_incoming: endpoint_setup.getter_for_incoming,
 
        })
 
        .collect();
 
    let udp_endpoint_exts = udp_todos
 
@@ -568,54 +603,65 @@ fn new_endpoint_manager(
 
        .map(|(index, udp_todo)| {
 
            let UdpTodo { mut sock, getter_for_incoming } = udp_todo;
 
            let token = TokenTarget::UdpEndpoint { index }.into();
 
            poll.registry().reregister(&mut sock, token, Interest::READABLE).unwrap();
 
            UdpEndpointExt {
 
                sock,
 
                outgoing_payloads: Default::default(),
 
                received_this_round: false,
 
                getter_for_incoming,
 
            }
 
        })
 
        .collect();
 
    Ok(EndpointManager {
 
    let endpoint_manager = EndpointManager {
 
        poll,
 
        events,
 
        undelayed_messages: delayed_messages, // no longer delayed
 
        delayed_messages: Default::default(),
 
        net_endpoint_store: EndpointStore {
 
            endpoint_exts: net_endpoint_exts,
 
            polled_undrained: net_polled_undrained,
 
        },
 
        udp_endpoint_store: EndpointStore {
 
            endpoint_exts: udp_endpoint_exts,
 
            polled_undrained: udp_polled_undrained,
 
        },
 
        udp_in_buffer: Default::default(),
 
    })
 
    };
 
    Ok((endpoint_manager, extra_port_info))
 
}
 

	
 
// Given a fully-formed endpoint manager,
 
// construct the consensus tree with:
 
// 1. decentralized leader election
 
// 2. centralized tree construction
 
fn init_neighborhood(
 
    connector_id: ConnectorId,
 
    logger: &mut dyn Logger,
 
    em: &mut EndpointManager,
 
    deadline: &Option<Instant>,
 
) -> Result<Neighborhood, ConnectError> {
 
    ////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 

	
 
    // storage structure for the state of a distributed wave
 
    // (for readability)
 
    #[derive(Debug)]
 
    struct WaveState {
 
        parent: Option<usize>,
 
        leader: ConnectorId,
 
    }
 

	
 
    // kick off a leader-election wave rooted at myself
 
    // given the desired wave information
 
    // (e.g. don't inform my parent if they exist)
 
    fn do_wave(
 
        em: &mut EndpointManager,
 
        awaiting: &mut HashSet<usize>,
 
        ws: &WaveState,
 
    ) -> Result<(), ConnectError> {
 
        awaiting.clear();
 
        let msg = S(Sm::LeaderWave { wave_leader: ws.leader });
 
        for index in em.index_iter() {
 
            if Some(index) != ws.parent {
 
                em.send_to_setup(index, &msg)?;
 
                awaiting.insert(index);
 
            }
 
@@ -652,24 +698,26 @@ fn init_neighborhood(
 
        // initially: No parent, I'm the best leader.
 
        let mut best_wave = WaveState { parent: None, leader: connector_id };
 
        // start a wave for this initial state
 
        do_wave(em, &mut awaiting, &best_wave)?;
 
        // with 1+ neighbors, progress is only made in response to incoming messages
 
        em.undelay_all();
 
        'election: loop {
 
            log!(logger, "Election loop. awaiting {:?}...", awaiting.iter());
 
            let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
            log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(Sm::LeaderAnnounce { tree_leader }) => {
 
                    // A neighbor explicitly tells me who is the leader
 
                    // they become my parent, and I adopt their announced leader
 
                    let election_result =
 
                        WaveState { leader: tree_leader, parent: Some(recv_index) };
 
                    log!(logger, "Election lost! Result {:?}", &election_result);
 
                    assert!(election_result.leader >= best_wave.leader);
 
                    assert_ne!(election_result.leader, connector_id);
 
                    break 'election election_result;
 
                }
 
                S(Sm::LeaderWave { wave_leader }) => {
 
                    use Ordering as O;
 
                    match wave_leader.cmp(&best_wave.leader) {
 
                        O::Less => log!(
 
                            logger,
 
@@ -725,94 +773,103 @@ fn init_neighborhood(
 
                }
 
                msg @ S(Sm::SessionScatter { .. })
 
                | msg @ S(Sm::SessionGather { .. })
 
                | msg @ Msg::CommMsg { .. } => {
 
                    log!(logger, "delaying msg {:?} during election algorithm", msg);
 
                    em.delayed_messages.push((recv_index, msg));
 
                }
 
            }
 
        }
 
    };
 

	
 
    // starting algorithm 2. Send a message to every neighbor
 
    // namely, send "YouAreMyParent" to parent (if they exist),
 
    // and LeaderAnnounce to everyone else
 
    log!(logger, "Starting tree construction. Step 1: send one msg per neighbor");
 
    awaiting.clear();
 
    for index in em.index_iter() {
 
        if Some(index) == election_result.parent {
 
            em.send_to_setup(index, &S(Sm::YouAreMyParent))?;
 
        } else {
 
            awaiting.insert(index);
 
            em.send_to_setup(
 
                index,
 
                &S(Sm::LeaderAnnounce { tree_leader: election_result.leader }),
 
            )?;
 
        }
 
    }
 
    // Receive one message from each neighbor to learn
 
    // whether they consider me their parent or not.
 
    let mut children = vec![];
 
    em.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(logger, "Tree construction_loop loop. awaiting {:?}...", awaiting.iter());
 
        let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?;
 
        log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(Sm::LeaderAnnounce { .. }) => {
 
                // not a child
 
                // `recv_index` is not my child
 
                log!(
 
                    logger,
 
                    "Got reply from non-child index {:?}. Children: {:?}",
 
                    recv_index,
 
                    children.iter()
 
                );
 
                if !awaiting.remove(&recv_index) {
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
            }
 
            S(Sm::YouAreMyParent) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        logger,
 
                        "Got reply from child index {:?}. Children before... {:?}",
 
                        recv_index,
 
                        children.iter()
 
                    );
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                // `recv_index` is my child
 
                children.push(recv_index);
 
            }
 
            msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(Sm::SessionScatter { .. })
 
            | msg @ S(Sm::SessionGather { .. })
 
            | msg @ Msg::CommMsg { .. } => {
 
                log!(logger, "delaying msg {:?} during election", msg);
 
                em.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    // Neighborhood complete!
 
    children.shrink_to_fit();
 
    let neighborhood =
 
        Neighborhood { parent: election_result.parent, children: VecSet::new(children) };
 
    log!(logger, "Neighborhood constructed {:?}", &neighborhood);
 
    Ok(neighborhood)
 
}
 

	
 
// Connectors collect a map of type ConnectorId=>SessionInfo,
 
// representing a global view of the session's state at the leader.
 
// The leader rewrites its contents however they like (currently: nothing happens)
 
// and the map is again broadcasted, for each peer to make their local changes to
 
// reflect the results of the rewrite.
 
fn session_optimize(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    deadline: &Option<Instant>,
 
) -> Result<(), ConnectError> {
 
    ////////////////////////////////////////
 
    use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm};
 
    ////////////////////////////////////////
 
    log!(cu.logger, "Beginning session optimization");
 
    // populate session_info_map from a message per child
 
    let mut unoptimized_map: HashMap<ConnectorId, SessionInfo> = Default::default();
 
    let mut awaiting: HashSet<usize> = comm.neighborhood.children.iter().copied().collect();
 
    comm.endpoint_manager.undelay_all();
 
    while !awaiting.is_empty() {
 
        log!(
 
            cu.logger,
 
            "Session gather loop. awaiting info from children {:?}...",
 
            awaiting.iter()
 
        );
 
        let (recv_index, msg) =
 
@@ -848,39 +905,39 @@ fn session_optimize(
 
            }
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.logger, "delaying msg {:?} during session optimization", msg);
 
                comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    log!(
 
        cu.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    // add my own session info to the map
 
    let my_session_info = SessionInfo {
 
        port_info: cu.ips.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        endpoint_incoming_to_getter: comm
 
            .endpoint_manager
 
            .net_endpoint_store
 
            .endpoint_exts
 
            .iter()
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(Sm::SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
                cu.logger,
 
                "Session scatter recv loop. awaiting info from children {:?}...",
 
                awaiting.iter()
 
            );
 
@@ -911,54 +968,64 @@ fn session_optimize(
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    // extract my own ConnectorId's entry
 
    let optimized_info =
 
        optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    // broadcast the optimized session info to my children
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    // apply local optimizations
 
    apply_my_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 

	
 
// Defines the optimization function, consuming an optimized map,
 
// and returning an optimized map.
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    // currently, it's the identity function
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
}
 
fn apply_optimizations(
 

	
 
// Modify the given connector's internals to reflect
 
// the given session info
 
fn apply_my_optimizations(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo {
 
        proto_components,
 
        port_info,
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // TODO some info which should be read-only can be mutated with the current scheme
 
    // simply overwrite the contents
 
    cu.ips.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm
 
        .endpoint_manager
 
        .net_endpoint_store
 
        .endpoint_exts
 
        .iter_mut()
 
        .zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
0 comments (0 inline, 0 general)