Changeset - 702ecfb0e5e9
[Not reviewed]
0 3 0
Christopher Esterhuyse - 5 years ago 2020-07-07 11:13:20
christopher.esterhuyse@gmail.com
distinguished subtree_id (identifying a connector's child in the consensus tree) from Route, as with the addition of udp endpoints, they no longer coincide
3 files changed with 38 insertions and 22 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -4,51 +4,51 @@ use crate::common::*;
 
////////////////
 
#[derive(Default)]
 
struct GetterBuffer {
 
    getters_and_sends: Vec<(PortId, SendPayloadMsg)>,
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    getter_buffer: GetterBuffer,
 
    deadline: Option<Instant>,
 
}
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>, // native branch is ended iff to_get.is_empty()
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as Route -> HashSet<Predicate> which is friendlier to iteration
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<Route, usize>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
    untaken_choice: Option<u16>,
 
    ended: bool,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 
@@ -199,56 +199,60 @@ impl Connector {
 
                cu.logger,
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => {
 
                    branching_proto_components
 
                        .insert(proto_component_id, BranchingProtoComponent::initial(component));
 
                }
 
            }
 
        }
 
        log!(
 
            cu.logger,
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 

	
 
        // Create temp structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            solution_storage: {
 
                let n = std::iter::once(Route::LocalComponent(ComponentId::Native));
 
                let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native));
 
                let c = cu
 
                    .proto_components
 
                    .keys()
 
                    .map(|&id| Route::LocalComponent(ComponentId::Proto(id)));
 
                let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index });
 
                let route_iter = n.chain(c).chain(e);
 
                SolutionStorage::new(route_iter)
 
                    .map(|&id| SubtreeId::LocalComponent(ComponentId::Proto(id)));
 
                let e = comm
 
                    .neighborhood
 
                    .children
 
                    .iter()
 
                    .map(|&index| SubtreeId::NetEndpoint { index });
 
                let subtree_id_iter = n.chain(c).chain(e);
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.id_manager.new_spec_var_stream(),
 
            getter_buffer: Default::default(),
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.logger, "Round context structure initialized");
 

	
 
        // Explore all native branches eagerly. Find solutions, buffer messages, etc.
 
        log!(
 
            cu.logger,
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        let native_branch_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // assign trues for ports that fire
 
                let firing_ports: HashSet<PortId> =
 
@@ -264,49 +268,49 @@ impl Connector {
 
                        log!(cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_branch_spec_var, branch_spec_val)
 
            };
 
            log!(cu.logger, "Native branch index={:?} has consistent {:?}", index, &predicate);
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg);
 
                rctx.getter_buffer.putter_add(cu, putter, msg);
 
            }
 
            if to_get.is_empty() {
 
                // this branch is immediately ready to be part of a solution
 
                log!(
 
                    cu.logger,
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    Route::LocalComponent(ComponentId::Native),
 
                    SubtreeId::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_branch_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(cu.logger, "Committing to decision {:?}!", &decision);
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
@@ -410,49 +414,53 @@ impl Connector {
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            log!(cu.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                let route = cu.port_info.routes.get(&getter);
 
                log!(cu.logger, "Routing msg {:?} to {:?}", &send_payload_msg, &route);
 
                match route {
 
                    None => {
 
                        log!(
 
                            cu.logger,
 
                            "Delivery to getter {:?} msg {:?} failed. Physical route unmapped!",
 
                            getter,
 
                            &send_payload_msg
 
                        );
 
                    }
 
                    Some(Route::Endpoint { index }) => {
 
                    Some(Route::UdpEndpoint { index }) => {
 
                        // TODO UDP RECV
 
                        todo!()
 
                    }
 
                    Some(Route::NetEndpoint { index }) => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut rctx.solution_storage,
 
                        getter,
 
                        &send_payload_msg,
 
                    ),
 
                    Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                rctx,
 
                                proto_component_id,
 
                                getter,
 
                                &send_payload_msg,
 
                            )?;
 
@@ -563,52 +571,53 @@ impl Connector {
 
                                .push((endpoint_index, Msg::CommMsg(comm_msg)));
 
                            continue 'undecided;
 
                        }
 
                    },
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter = comm.endpoint_manager.net_endpoint_exts[endpoint_index]
 
                            .getter_for_incoming;
 
                        assert!(cu.port_info.polarities.get(&getter) == Some(&Getter));
 
                        log!(
 
                            cu.logger,
 
                            "Msg routed to getter port {:?}. Buffer for recv loop",
 
                            getter,
 
                        );
 
                        rctx.getter_buffer.getter_add(getter, send_payload_msg);
 
                    }
 
                    CommMsgContents::Suggest { suggestion } => {
 
                        // only accept this control msg through a child endpoint
 
                        if comm.neighborhood.children.contains(&endpoint_index) {
 
                            match suggestion {
 
                                Decision::Success(predicate) => {
 
                                    // child solution contributes to local solution
 
                                    log!(cu.logger, "Child provided solution {:?}", &predicate);
 
                                    let route = Route::Endpoint { index: endpoint_index };
 
                                    let subtree_id =
 
                                        SubtreeId::NetEndpoint { index: endpoint_index };
 
                                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *cu.logger,
 
                                        route,
 
                                        subtree_id,
 
                                        predicate,
 
                                    );
 
                                }
 
                                Decision::Failure => {
 
                                    match comm.neighborhood.parent {
 
                                        None => {
 
                                            log!(cu.logger, "I decide on my child's failure");
 
                                            break 'undecided Ok(Decision::Failure);
 
                                        }
 
                                        Some(parent) => {
 
                                            log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                            if already_requested_failure.replace_with_true() {
 
                                                Self::request_failure(cu, comm, parent)?
 
                                            } else {
 
                                                log!(cu.logger, "Already requested failure");
 
                                            }
 
                                        }
 
                                    }
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Discarding suggestion {:?} from non-child endpoint idx {:?}",
 
@@ -656,52 +665,52 @@ impl BranchingNative {
 
        solution_storage: &mut SolutionStorage,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
    ) {
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.spec_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                branch.to_get.remove(&getter);
 
                if branch.to_get.is_empty() {
 
                    log!(
 
                        cu.logger,
 
                        "new native solution with {:?} (to_get.is_empty()) with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let route = Route::LocalComponent(ComponentId::Native);
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        route,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                finished.insert(predicate, branch);
 
                continue;
 
            }
 
            use AllMapperResult as Amr;
 
            match predicate.all_mapper(&send_payload_msg.predicate) {
 
                Amr::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.logger,
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
                    finished.insert(predicate, branch);
 
@@ -783,51 +792,52 @@ impl BranchingProtoComponent {
 
                &blocker,
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::NondetChoice { n } => {
 
                    let var = rctx.spec_var_stream.next();
 
                    for val in SpecVal::iter_domain().take(n as usize) {
 
                        let pred = predicate.clone().inserted(var, val);
 
                        let mut branch_n = branch.clone();
 
                        branch_n.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    }
 
                }
 
                B::Inconsistent => {
 
                    // branch is inconsistent. throw it away
 
                    drop((predicate, branch));
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for &port in ports.iter() {
 
                        let var = cu.port_info.spec_var_for(port);
 
                        predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                    }
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id));
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        Route::LocalComponent(ComponentId::Proto(proto_component_id)),
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    assert!(!branch.inbox.contains_key(&port));
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.port_info.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.port_info.spec_var_for(putter);
 
@@ -924,95 +934,95 @@ impl BranchingProtoComponent {
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && solution_predicate.consistent_with(&branch_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch {
 
            inbox: Default::default(),
 
            state,
 
            ended: false,
 
            untaken_choice: None,
 
        };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(routes: impl Iterator<Item = Route>) -> Self {
 
        let mut subtree_id_to_index: HashMap<Route, usize> = Default::default();
 
    fn new(subtree_ids: impl Iterator<Item = SubtreeId>) -> Self {
 
        let mut subtree_id_to_index: HashMap<SubtreeId, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for key in routes {
 
            subtree_id_to_index.insert(key, subtree_solutions.len());
 
        for id in subtree_ids {
 
            subtree_id_to_index.insert(id, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    fn reset(&mut self, subtree_ids: impl Iterator<Item = Route>) {
 
    fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        subtree_id: Route,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut dyn Logger,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
src/runtime/mod.rs
Show inline comments
 
@@ -53,49 +53,55 @@ pub(crate) struct SyncProtoContext<'a> {
 
)]
 
struct SpecVar(PortId);
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVal(u16);
 
#[derive(Debug)]
 
struct RoundOk {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 
#[derive(Default)]
 
struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum ComponentId {
 
    Native,
 
    Proto(ProtoComponentId),
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum Route {
 
    LocalComponent(ComponentId),
 
    Endpoint { index: usize },
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
}
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
enum Decision {
 
    Failure,
 
    Success(Predicate),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
src/runtime/setup.rs
Show inline comments
 
@@ -362,49 +362,49 @@ fn new_endpoint_manager(
 
                                    );
 
                                    if peer_info.polarity == local_polarity {
 
                                        return Err(ConnectError::PortPeerPolarityMismatch(
 
                                            todo.local_port,
 
                                        ));
 
                                    }
 
                                    todo.recv_peer_port = Some(peer_info.port);
 
                                    // 1. finally learned the peer of this port!
 
                                    port_info.peers.insert(todo.local_port, peer_info.port);
 
                                    // 2. learned the info of this peer port
 
                                    port_info.polarities.insert(peer_info.port, peer_info.polarity);
 
                                    port_info.peers.insert(peer_info.port, todo.local_port);
 
                                    if let Some(route) = port_info.routes.get(&peer_info.port) {
 
                                        // check just for logging purposes
 
                                        log!(
 
                                            logger,
 
                                            "Special case! Route to peer {:?} already known to be {:?}. Leave untouched",
 
                                            peer_info.port,
 
                                            route
 
                                        );
 
                                    }
 
                                    port_info
 
                                        .routes
 
                                        .entry(peer_info.port)
 
                                        .or_insert(Route::Endpoint { index });
 
                                        .or_insert(Route::NetEndpoint { index });
 
                                }
 
                                Some(inappropriate_msg) => {
 
                                    log!(
 
                                        logger,
 
                                        "delaying msg {:?} during channel setup phase",
 
                                        inappropriate_msg
 
                                    );
 
                                    delayed_messages.push((index, inappropriate_msg));
 
                                }
 
                            }
 
                        }
 
                        // is the setup for this net_endpoint now complete?
 
                        if todo.sent_local_port && todo.recv_peer_port.is_some() {
 
                            // yes! connected, sent my info and received peer's info
 
                            setup_incomplete.remove(&index);
 
                            log!(logger, "endpoint[{}] is finished!", index);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
        events.clear();
 
    }
 
    // all todos must be the NetEndpoint variants! unwrap and collect them
0 comments (0 inline, 0 general)