Changeset - c4a5132773ce
[Not reviewed]
0 4 0
Christopher Esterhuyse - 5 years ago 2020-09-23 14:06:29
christopher.esterhuyse@gmail.com
cleanup and even more doc comments
4 files changed with 191 insertions and 109 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
@@ -107,97 +107,97 @@ impl<'a, K, V> Drop for MapTempGuard<'a, K, V> {
 
    fn drop(&mut self) {
 
        assert!(self.0.is_empty()); // sanity check
 
    }
 
}
 
impl<'a, K, V> Deref for MapTempGuard<'a, K, V> {
 
    type Target = HashMap<K, V>;
 
    fn deref(&self) -> &<Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> {
 
    fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl Connector {
 
    /// Read the message received by the given port in the previous synchronous round.
 
    pub fn gotten(&self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        if let ConnectorPhased::Communication(comm) = &self.phased {
 
            match &comm.round_result {
 
                Err(_) => Err(Ge::PreviousSyncFailed),
 
                Ok(None) => Err(Ge::NoPreviousRound),
 
                Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
            }
 
        } else {
 
            return Err(Ge::NoPreviousRound);
 
        }
 
    }
 
    /// Creates a new, empty synchronous batch for the connector and selects it.
 
    /// Subsequent calls to `put` and `get` with populate the new batch with port operations.
 
    pub fn next_batch(&mut self) -> Result<usize, WrongStateError> {
 
        // returns index of new batch
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            comm.native_batches.push(Default::default());
 
            Ok(comm.native_batches.len() - 1)
 
        } else {
 
            Err(WrongStateError)
 
        }
 
    }
 

	
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased: cu, phased } = self;
 
        let info = cu.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        let info = cu.ips.port_info.get(&port).ok_or(Poe::UnknownPolarity)?;
 
        if info.owner != cu.native_component_id {
 
            return Err(Poe::PortUnavailable);
 
        }
 
        if info.polarity != expect_polarity {
 
            return Err(Poe::WrongPolarity);
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Poe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 

	
 
    /// Add a `put` operation to the connector's currently-selected synchronous batch.
 
    /// Returns an error if the given port is not owned by the native component,
 
    /// has the wrong polarity, or is already included in the batch.
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(Poe::MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 

	
 
    /// Add a `get` operation to the connector's currently-selected synchronous batch.
 
    /// Returns an error if the given port is not owned by the native component,
 
    /// has the wrong polarity, or is already included in the batch.
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(Poe::MultipleOpsOnPort)
 
        }
 
    }
 

	
 
    /// Participate in the completion of the next synchronous round, in which
 
    /// the native component will perform the set of prepared operations of exactly one
 
    /// of the synchronous batches. At the end of the procedure, the synchronous
 
    /// batches will be reset to a singleton set, whose only element is selected, and empty.
 
    /// The caller yields control over to the connector runtime to faciltiate the underlying
 
    /// coordination work until either (a) the round is completed with all components' states
 
@@ -216,317 +216,317 @@ impl Connector {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                match &comm.round_result {
 
                    Err(SyncError::Unrecoverable(e)) => {
 
                        log!(cu.logger(), "Attempted to start sync round, but previous error {:?} was unrecoverable!", e);
 
                        return Err(SyncError::Unrecoverable(e.clone()));
 
                    }
 
                    _ => {}
 
                }
 
                comm.round_result = Self::connected_sync(cu, comm, timeout);
 
                comm.round_index += 1;
 
                match &comm.round_result {
 
                    Ok(None) => unreachable!(),
 
                    Ok(Some(ok_result)) => Ok(ok_result.batch_index),
 
                    Err(sync_error) => Err(sync_error.clone()),
 
                }
 
            }
 
        }
 
    }
 

	
 
    // Attempts to complete the synchronous round for the given
 
    // communication-phased connector structure.
 
    // Modifies components and ports in `cu` IFF the round succeeds.
 
    #[inline]
 
    fn connected_sync(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        timeout: Option<Duration>,
 
    ) -> Result<Option<RoundEndedNative>, SyncError> {
 
        //////////////////////////////////
 
        use SyncError as Se;
 
        //////////////////////////////////
 

	
 
        log!(@MARK, cu.logger(), "sync start {}", comm.round_index);
 
        log!(
 
            cu.logger(),
 
            "~~~ SYNC called with timeout {:?}; starting round {}",
 
            &timeout,
 
            comm.round_index
 
        );
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // Create separate storages for ports and components stored in `cu`,
 
        // while kicking off the branching of components until the set of
 
        // components entering their synchronous block is finalized in `branching_proto_components`.
 
        // This is the last time cu's components and ports are accessed until the round is decided.
 
        let mut current_state = cu.current_state.clone();
 
        let mut ips = cu.ips.clone();
 
        let mut branching_proto_components =
 
            HashMap::<ComponentId, BranchingProtoComponent>::default();
 
        let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu
 
            .proto_components
 
            .iter()
 
            .map(|(&proto_id, proto)| (proto_id, proto.clone()))
 
            .collect();
 
        log!(cu.logger(), "Nonsync running {} proto components...", unrun_components.len());
 
        // initially, the set of components to run is the set of components stored by `cu`,
 
        // but they are eventually drained into `branching_proto_components`.
 
        // Some components exit first, and others are created and put into `unrun_components`.
 
        while let Some((proto_component_id, mut component)) = unrun_components.pop() {
 
            log!(
 
                cu.logger(),
 
                "Nonsync running proto component with ID {:?}. {} to go after this",
 
                proto_component_id,
 
                unrun_components.len()
 
            );
 
            let (logger, proto_description) = cu.logger_and_protocol_description();
 
            let mut ctx = NonsyncProtoContext {
 
                current_state: &mut current_state,
 
                ips: &mut ips,
 
                logger,
 
                proto_component_id,
 
                unrun_components: &mut unrun_components,
 
            };
 
            let blocker = component.nonsync_run(&mut ctx, proto_description);
 
            log!(
 
                cu.logger(),
 
                "proto component {:?} ran to nonsync blocker {:?}",
 
                proto_component_id,
 
                &blocker
 
            );
 
            use NonsyncBlocker as B;
 
            match blocker {
 
                B::ComponentExit => drop(component),
 
                B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)),
 
                B::SyncBlockStart => assert!(branching_proto_components
 
                    .insert(proto_component_id, BranchingProtoComponent::initial(component))
 
                    .is_none()), // Some(_) returned IFF some component identifier key is overwritten (BAD!)
 
            }
 
        }
 
        log!(
 
            cu.logger(),
 
            "All {} proto components are now done with Nonsync phase",
 
            branching_proto_components.len(),
 
        );
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // Create temporary structures needed for the synchronous phase of the round
 
        let mut rctx = RoundCtx {
 
            current_state, // already used previously, now moved into RoundCtx
 
            ips, // already used previously, now moved into RoundCtx
 
            solution_storage: {
 
                let subtree_id_iter = {
 
                    // Create an iterator over the identifiers of this
 
                    // connector's childen in the _solution tree_.
 
                    // Namely, the native, all locally-managed components,
 
                    // and all this connector's children in the _consensus tree_ (other connectors).
 
                    let n = std::iter::once(SubtreeId::LocalComponent(cu.native_component_id));
 
                    let c = branching_proto_components
 
                        .keys()
 
                        .map(|&cid| SubtreeId::LocalComponent(cid));
 
                    let e = comm
 
                        .neighborhood
 
                        .children
 
                        .iter()
 
                        .map(|&index| SubtreeId::NetEndpoint { index });
 
                    n.chain(c).chain(e)
 
                };
 
                log!(
 
                    cu.logger,
 
                    "Children in subtree are: {:?}",
 
                    DebuggableIter(subtree_id_iter.clone())
 
                );
 
                SolutionStorage::new(subtree_id_iter)
 
            },
 
            spec_var_stream: cu.current_state.id_manager.new_spec_var_stream(),
 
            spec_var_stream: cu.ips.id_manager.new_spec_var_stream(),
 
            payload_inbox: Default::default(), // buffer for in-memory payloads to be handled
 
            deadline: timeout.map(|to| Instant::now() + to),
 
        };
 
        log!(cu.logger(), "Round context structure initialized");
 
        log!(@BENCH, cu.logger(), "");
 

	
 
        // Prepare the branching native component, involving the conversion
 
        // of its synchronous batches (user provided) into speculative branches eagerly.
 
        // As a side effect, send all PUTs with the appropriate predicates.
 
        // Afterwards, each native component's speculative branch finds a local
 
        // solution the moment it's received all the messages it's awaiting.
 
        log!(
 
            cu.logger(),
 
            "Translating {} native batches into branches...",
 
            comm.native_batches.len()
 
        );
 
        // Allocate a single speculative variable to distinguish each native branch.
 
        // This enables native components to have distinct branches with identical
 
        // FIRING variables.
 
        let native_spec_var = rctx.spec_var_stream.next();
 
        log!(cu.logger(), "Native branch spec var is {:?}", native_spec_var);
 
        let mut branching_native = BranchingNative { branches: Default::default() };
 
        'native_branches: for ((native_branch, index), branch_spec_val) in
 
            comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain())
 
        {
 
            let NativeBatch { to_get, to_put } = native_branch;
 
            // compute the solution predicate to associate with this branch.
 
            let predicate = {
 
                let mut predicate = Predicate::default();
 
                // all firing ports have SpecVal::FIRING
 
                let firing_iter = to_get.iter().chain(to_put.keys()).copied();
 
                log!(
 
                    cu.logger(),
 
                    "New native with firing ports {:?}",
 
                    firing_iter.clone().collect::<Vec<_>>()
 
                );
 
                let firing_ports: HashSet<PortId> = firing_iter.clone().collect();
 
                for port in firing_iter {
 
                    let var = cu.current_state.spec_var_for(port);
 
                    let var = cu.ips.spec_var_for(port);
 
                    predicate.assigned.insert(var, SpecVal::FIRING);
 
                }
 
                // all silent ports have SpecVal::SILENT
 
                for port in cu.current_state.ports_owned_by(cu.native_component_id) {
 
                for port in cu.ips.ports_owned_by(cu.native_component_id) {
 
                    if firing_ports.contains(port) {
 
                        // this one is FIRING
 
                        continue;
 
                    }
 
                    let var = cu.current_state.spec_var_for(*port);
 
                    let var = cu.ips.spec_var_for(*port);
 
                    if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) {
 
                        log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var);
 
                        continue 'native_branches;
 
                    }
 
                }
 
                // this branch is consistent. distinguish it with a unique var:val mapping and proceed
 
                predicate.inserted(native_spec_var, branch_spec_val)
 
            };
 
            log!(cu.logger(), "Native branch index={:?} has consistent {:?}", index, &predicate);
 
            // send all outgoing messages (by buffering them)
 
            for (putter, payload) in to_put {
 
                let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                log!(
 
                    cu.logger(),
 
                    "Native branch {} sending msg {:?} with putter {:?}",
 
                    index,
 
                    &msg,
 
                    putter
 
                );
 
                // sanity check
 
                assert_eq!(Putter, cu.current_state.port_info.get(&putter).unwrap().polarity);
 
                assert_eq!(Putter, cu.ips.port_info.get(&putter).unwrap().polarity);
 
                rctx.putter_push(cu, putter, msg);
 
            }
 
            let branch = NativeBranch { index, gotten: Default::default(), to_get };
 
            if branch.is_ended() {
 
                // empty to_get set => already corresponds with a component solution
 
                log!(
 
                    cu.logger(),
 
                    "Native submitting solution for batch {} with {:?}",
 
                    index,
 
                    &predicate
 
                );
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    cu,
 
                    SubtreeId::LocalComponent(cu.native_component_id),
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 
        // Call to another big method; keep running this round
 
        // until a distributed decision is reached!
 
        log!(cu.logger(), "Searching for decision...");
 
        log!(@BENCH, cu.logger(), "");
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(@MARK, cu.logger(), "got decision!");
 
        log!(cu.logger(), "Committing to decision {:?}!", &decision);
 
        log!(@BENCH, cu.logger(), "");
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger(), &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.logger(),
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        log!(@MARK, cu.logger(), "forwarding decision!");
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // untouched port/component fields of `cu` are NOT overwritten.
 
                // the result is a rollback.
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    // "flatten" branching components, committing the speculation
 
                    // consistent with the predicate decided upon.
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(cid, bpc)| (cid, bpc.collapse_with(&predicate))),
 
                );
 
                // commit changes to ports and id_manager
 
                cu.current_state = rctx.current_state;
 
                cu.ips = rctx.ips;
 
                log!(
 
                    cu.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                let round_ok = branching_native.collapse_with(&mut *cu.logger(), &predicate);
 
                Ok(Some(round_ok))
 
            }
 
        };
 
        log!(cu.logger(), "Sync round ending! Cleaning up");
 
        log!(@BENCH, cu.logger(), "");
 
        ret
 
    }
 

	
 
    // Once the synchronous round has been started, this procedure
 
    // routs and handles payloads, receives control messages from neighboring connectors,
 
    // checks for timeout, and aggregates solutions until a distributed decision is reached.
 
    // The decision is either a solution (success case), or a distributed timeout rollback (failure case)
 
    // The final possible outcome is an unrecoverable error, which results from some fundamental misbehavior,
 
    // a network channel breaking, etc.
 
    fn sync_reach_decision(
 
        cu: &mut impl CuUndecided,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        // The round is in progress, and now its just a matter of arriving at a decision.
 
        log!(@MARK, cu.logger(), "decide start");
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            // An unsatisfiable native is the easiest way to detect failure
 
            log!(cu.logger(), "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger(), "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.logger(), "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
@@ -535,97 +535,97 @@ impl Connector {
 
        // This is an optimization, avoiding repeated allocation.
 
        let mut pcb_temps_owner = <[HashMap<Predicate, ProtoComponentBranch>; 3]>::default();
 
        let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner);
 
        let mut bn_temp_owner = <HashMap<Predicate, NativeBranch>>::default();
 

	
 
        // first, we run every protocol component to their sync blocker.
 
        // Afterwards we establish a loop invariant: no new decision can be reached
 
        // without handling messages in the buffer or arriving from the network
 
        log!(
 
            cu.logger(),
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { branches } = proto_component;
 
            // must reborrow to constrain the lifetime of pcb_temps to inside the loop
 
            let (swap, pcb_temps) = pcb_temps.reborrow().split_first_mut();
 
            let (blocked, _pcb_temps) = pcb_temps.split_first_mut();
 
            // initially, no protocol components have .ended==true
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, swap.0, blocked.0);
 
            BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?;
 
            // swap the blocked branches back
 
            std::mem::swap(blocked.0, branches);
 
            if branches.is_empty() {
 
                log!(cu.logger(), "{:?} has become inconsistent!", proto_component_id);
 
                if let Some(parent) = comm.neighborhood.parent {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger(), "Already requested failure");
 
                    }
 
                } else {
 
                    log!(cu.logger(), "As the leader, deciding on timeout");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger(), "All proto components are blocked");
 
        // ...invariant established!
 

	
 
        log!(cu.logger(), "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // handle all buffered messages, sending them through endpoints / feeding them to components
 
            log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_pop() {
 
                log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg);
 
                let getter_info = rctx.current_state.port_info.get(&getter).unwrap();
 
                let getter_info = rctx.ips.port_info.get(&getter).unwrap();
 
                let cid = getter_info.owner; // the id of the component owning `getter` port
 
                assert_eq!(Getter, getter_info.polarity); // sanity check
 
                log!(
 
                    cu.logger(),
 
                    "Routing msg {:?} to {:?} via {:?}",
 
                    &send_payload_msg,
 
                    getter,
 
                    &getter_info.route
 
                );
 
                match getter_info.route {
 
                    Route::UdpEndpoint { index } => {
 
                        // this is a message sent over the network through a UDP endpoint
 
                        let udp_endpoint_ext =
 
                            &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[index];
 
                        let SendPayloadMsg { predicate, payload } = send_payload_msg;
 
                        log!(cu.logger(), "Delivering to udp endpoint index={}", index);
 
                        // UDP mediator messages are buffered until the end of the round,
 
                        // because they are still speculative
 
                        udp_endpoint_ext.outgoing_payloads.insert(predicate, payload);
 
                    }
 
                    Route::NetEndpoint { index } => {
 
                        // this is a message sent over the network as a control message
 
                        log!(@MARK, cu.logger(), "sending payload");
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        // actually send the message now
 
                        comm.endpoint_manager.send_to_comms(index, &msg)?;
 
                    }
 
                    Route::LocalComponent if cid == cu.native_component_id() => branching_native
 
                        .feed_msg(
 
                            cu,
 
                            rctx,
 
                            getter,
 
                            &send_payload_msg,
 
                            MapTempGuard::new(&mut bn_temp_owner),
 
                        ),
 
                    Route::LocalComponent => {
 
                        // some other component_id routed locally. must be a protocol component!
 
                        if let Some(branching_component) = branching_proto_components.get_mut(&cid)
 
                        {
 
                            // The recipient component is still running!
 
                            // Feed it this message AND run it again until all branches are blocked
 
                            branching_component.feed_msg(
 
                                cu,
 
                                rctx,
 
                                cid,
 
@@ -787,105 +787,105 @@ impl Connector {
 
                                cu.logger(),
 
                                "Discarding announcement {:?} from non-parent endpoint idx {:?}",
 
                                &decision,
 
                                net_index
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 
            log!(cu.logger(), "Endpoint msg recv done");
 
        }
 
    }
 

	
 
    // Send a failure request to my parent in the consensus tree
 
    fn request_failure(
 
        cu: &mut impl CuUndecided,
 
        comm: &mut ConnectorCommunication,
 
        parent: usize,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        log!(cu.logger(), "Forwarding to my parent {:?}", parent);
 
        let suggestion = Decision::Failure;
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion }),
 
        });
 
        comm.endpoint_manager.send_to_comms(parent, &msg)
 
    }
 
}
 
impl NativeBranch {
 
    fn is_ended(&self) -> bool {
 
        self.to_get.is_empty()
 
    }
 
}
 
impl BranchingNative {
 

	
 
    // Feed the given payload to the native component
 
    // May result in discovering new component solutions,
 
    // or fork speculative branches if the message's predicate
 
    // is MORE SPECIFIC than the branches of the native
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        bn_temp: MapTempGuard<'_, Predicate, NativeBranch>,
 
    ) {
 
        log!(cu.logger(), "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert_eq!(Getter, rctx.current_state.port_info.get(&getter).unwrap().polarity);
 
        assert_eq!(Getter, rctx.ips.port_info.get(&getter).unwrap().polarity);
 
        let mut draining = bn_temp;
 
        let finished = &mut self.branches;
 
        std::mem::swap(draining.0, finished);
 
        // Visit all native's branches, and feed those whose current predicates are
 
        // consistent with that of the received message.
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            let var = rctx.current_state.spec_var_for(getter);
 
            let var = rctx.ips.spec_var_for(getter);
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch,
 
                // because the resulting branch would have an inconsistent predicate.
 
                // the existing branch asserts the getter port is SILENT
 
                log!(
 
                    cu.logger(),
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                Self::insert_branch_merging(finished, predicate, branch);
 
                continue;
 
            }
 
            // Define a little helper closure over `rctx`
 
            // for feeding the given branch this new payload,
 
            // and submitting any resulting solutions
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                // This branch notes the getter port as "gotten"
 
                branch.to_get.remove(&getter);
 
                if let Some(was) = branch.gotten.insert(getter, send_payload_msg.payload.clone()) {
 
                    // Sanity check. Payload mapping (Predicate,Port) should be unique each round
 
                    assert_eq!(&was, &send_payload_msg.payload);
 
                }
 
                if branch.is_ended() {
 
                    // That was the last message the branch was awaiting!
 
                    // Submitting new component solution.
 
                    log!(
 
                        cu.logger(),
 
                        "new native solution with {:?} is_ended() with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let subtree_id = SubtreeId::LocalComponent(cu.native_component_id());
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        cu,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                } else {
 
                    // This branch still has ports awaiting their messages
 
                    log!(
 
                        cu.logger(),
 
                        "Fed native {:?} still has to_get {:?}",
 
                        &predicate,
 
                        &branch.to_get
 
                    );
 
                }
 
            };
 
            use AssignmentUnionResult as Aur;
 
@@ -1001,131 +1001,131 @@ impl BranchingNative {
 
        }
 
        panic!("Native had no branches matching pred {:?}", solution_predicate);
 
    }
 
}
 
impl BranchingProtoComponent {
 

	
 
    // Create a singleton-branch branching protocol component as
 
    // speculation begins, with the given protocol state.
 
    fn initial(state: ComponentState) -> Self {
 
        let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false };
 
        Self { branches: hashmap! { Predicate::default() => branch } }
 
    }
 

	
 
    // run all the given branches (cd.input) to their SyncBlocker,
 
    // populating cd.output (by way of CyclicDrainer::cyclic_drain).
 
    // This procedure might lose branches, and it might create new branches.
 
    fn drain_branches_to_blocked(
 
        cd: CyclicDrainer<Predicate, ProtoComponentBranch>,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cyclic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                rctx,
 
                predicate: &predicate,
 
                branch_inner: &mut branch.inner,
 
            };
 
            // Run this component's state to the next syncblocker for handling
 
            let blocker = branch.state.sync_run(&mut ctx, cu.proto_description());
 
            log!(
 
                cu.logger(),
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
                &predicate,
 
                &blocker,
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency
 
                B::CouldntReadMsg(port) => {
 
                    // sanity check: `CouldntReadMsg` returned IFF the message is unavailable
 
                    assert!(!branch.inner.inbox.contains_key(&port)); 
 
                    // This branch hit a proper blocker: progress awaits the receipt of some message. Exit the cycle.
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned
 
                    let var = rctx.current_state.spec_var_for(port);
 
                    let var = rctx.ips.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // speculate on the two possible values of `var`. Schedule both branches to be rerun.
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check: The given port indeed has `Putter` polarity
 
                    assert_eq!(Putter, rctx.current_state.port_info.get(&putter).unwrap().polarity);
 
                    assert_eq!(Putter, rctx.ips.port_info.get(&putter).unwrap().polarity);
 
                    // assign FIRING to this port's associated firing variable
 
                    let var = rctx.current_state.spec_var_for(putter);
 
                    let var = rctx.ips.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        // Discard the branch, as it clearly has contradictory requirements for this value.
 
                        log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!",
 
                            proto_component_id, putter, var);
 
                        drop((predicate, branch));
 
                    } else {
 
                        // Note that this port has put this round,
 
                        // and assert that this isn't its 2nd time putting this round (otheriwse PDL programming error)
 
                        assert!(branch.inner.did_put_or_get.insert(putter));
 
                        log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})",
 
                            proto_component_id, &predicate, &payload, putter, var);
 
                        // Send the given payload (by buffering it).
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.putter_push(cu, putter, msg);
 
                        // Branch can still make progress. Schedule to be rerun
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
                B::SyncBlockEnd => {
 
                    // This branch reached the end of it's synchronous block
 
                    // assign all variables of owned ports that DIDN'T fire to SILENT
 
                    for port in rctx.current_state.ports_owned_by(proto_component_id) {
 
                        let var = rctx.current_state.spec_var_for(*port);
 
                    for port in rctx.ips.ports_owned_by(proto_component_id) {
 
                        let var = rctx.ips.spec_var_for(*port);
 
                        let actually_exchanged = branch.inner.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let speculated_to_fire = val == SpecVal::FIRING;
 
                        if actually_exchanged != speculated_to_fire {
 
                            log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} actually_exchanged={}, speculated_to_fire={}",
 
                                port, var, val, actually_exchanged, speculated_to_fire);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
                        }
 
                    }
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(proto_component_id);
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        cu,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // This branch exits the cyclic drain
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::NondetChoice { n } => {
 
                    // This branch requested the creation of a new n-way nondeterministic
 
                    // fork of the branch with a fresh speculative variable.
 
                    // ... allocate a new speculative variable 
 
                    let var = rctx.spec_var_stream.next();
 
                    // ... and for n distinct values, create a new forked branch,
 
                    // and schedule them to be rerun through the cyclic drain.
 
                    for val in SpecVal::iter_domain().take(n as usize) {
 
                        let pred = predicate.clone().inserted(var, val);
 
                        let mut branch_n = branch.clone();
 
                        branch_n.inner.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    }
 
                }
 
            }
 
            Ok(())
 
        })
 
    }
 

	
 
    // Feed this branching protocol component the given message, and
 
    // then run all branches until they are once again blocked. 
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut impl CuUndecided,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ComponentId,
 
@@ -1317,155 +1317,155 @@ impl SolutionStorage {
 
            // iterator over SETS of solutions, one for every component except `subtree_id` (me)
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            // Recursively enumerate all solutions matching the description above,
 
            Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local);
 
        }
 
    }
 

	
 
    // Recursively build local solutions for this connector,
 
    // see `submit_and_digest_subtree_solution`
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        cu: &mut impl CuUndecided,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep recursively creating combined solutions
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        cu,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. This is a solution for this connector...
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(cu.logger(), "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
 
            }
 
        }
 
    }
 
}
 
impl NonsyncProtoContext<'_> {
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new component and changing the given port's ownership to that
 
    // of the new component.
 
    pub(crate) fn new_component(&mut self, moved_ports: HashSet<PortId>, state: ComponentState) {
 
        // Sanity check! The moved ports are owned by this component to begin with
 
        for port in moved_ports.iter() {
 
            assert_eq!(
 
                self.proto_component_id,
 
                self.current_state.port_info.get(port).unwrap().owner
 
                self.ips.port_info.get(port).unwrap().owner
 
            );
 
        }
 
        // Create the new component, and schedule it to be run
 
        let new_cid = self.current_state.id_manager.new_component_id();
 
        let new_cid = self.ips.id_manager.new_component_id();
 
        log!(
 
            self.logger,
 
            "Component {:?} added new component {:?} with state {:?}, moving ports {:?}",
 
            self.proto_component_id,
 
            new_cid,
 
            &state,
 
            &moved_ports
 
        );
 
        self.unrun_components.push((new_cid, state));
 
        // Update the ownership of the moved ports
 
        for port in moved_ports.iter() {
 
            self.current_state.port_info.get_mut(port).unwrap().owner = new_cid;
 
            self.ips.port_info.get_mut(port).unwrap().owner = new_cid;
 
        }
 
    }
 

	
 
    // Facilitates callback from the component to the connector runtime,
 
    // creating a new port-pair connected by an memory channel
 
    pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] {
 
        // adds two new associated ports, related to each other, and exposed to the proto component
 
        let mut new_cid_fn = || self.current_state.id_manager.new_port_id();
 
        let mut new_cid_fn = || self.ips.id_manager.new_port_id();
 
        let [o, i] = [new_cid_fn(), new_cid_fn()];
 
        self.current_state.port_info.insert(
 
        self.ips.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                polarity: Putter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        self.current_state.port_info.insert(
 
        self.ips.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                polarity: Getter,
 
                owner: self.proto_component_id,
 
            },
 
        );
 
        log!(
 
            self.logger,
 
            "Component {:?} port pair (out->in) {:?} -> {:?}",
 
            self.proto_component_id,
 
            o,
 
            i
 
        );
 
        [o, i]
 
    }
 
}
 
impl SyncProtoContext<'_> {
 
    // The component calls the runtime back, inspecting whether it's associated
 
    // preidcate has already determined a (speculative) value for the given port's firing variable.
 
    pub(crate) fn is_firing(&mut self, port: PortId) -> Option<bool> {
 
        let var = self.rctx.current_state.spec_var_for(port);
 
        let var = self.rctx.ips.spec_var_for(port);
 
        self.predicate.query(var).map(SpecVal::is_firing)
 
    }
 

	
 
    // The component calls the runtime back, trying to inspect a port's message
 
    pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> {
 
        let maybe_msg = self.branch_inner.inbox.get(&port);
 
        if maybe_msg.is_some() {
 
            // Make a note that this component has received
 
            // this port's message 1+ times this round
 
            self.branch_inner.did_put_or_get.insert(port);
 
        }
 
        maybe_msg
 
    }
 

	
 
    // NOT CURRENTLY USED
 
    // Once this component has injected a new nondeterministic branch with
 
    // SyncBlocker::NondetChoice, this is how the component retrieves it.
 
    // (Two step process necessary to get around mutable access rules,
 
    //  as injection of the nondeterministic choice modifies the
 
    //  branch predicate, forks the branch, etc.)
 
    pub(crate) fn take_choice(&mut self) -> Option<u16> {
 
        self.branch_inner.untaken_choice.take()
 
    }
 
}
 
impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {
 
    fn new(
 
        input: &'a mut HashMap<K, V>,
 
        swap: &'a mut HashMap<K, V>,
 
        output: &'a mut HashMap<K, V>,
 
    ) -> Self {
 
        Self { input, inner: CyclicDrainerInner { swap, output } }
 
    }
 

	
 
    // This hides the ugliness of facilitating a memory-safe cyclic drain.
 
    // A "drain" would refer to a procedure that empties the input and populates the output.
 
    // It's "cyclic" because the processing function can also populate the input.
 
    // Making this memory safe requires an additional temporary storage, such that
 
    // the input can safely be drained and populated concurrently.
 
    fn cyclic_drain<E>(
 
        self,
 
        mut func: impl FnMut(K, V, CyclicDrainerInner<'_, K, V>) -> Result<(), E>,
 
    ) -> Result<(), E> {
 
        let Self { input, inner: CyclicDrainerInner { swap, output } } = self;
 
        while !input.is_empty() {
 
            for (k, v) in input.drain() {
 
                // func is the user-provided callback function, which consumes an element
 
                // as its drained from the input
 
                func(k, v, CyclicDrainerInner { swap, output })?
src/runtime/endpoints.rs
Show inline comments
 
@@ -192,97 +192,97 @@ impl EndpointManager {
 
                    },
 
                };
 
                match comm_msg_contents {
 
                    CommMsgContents::CommCtrl(comm_ctrl_msg) => Some((net_index, comm_ctrl_msg)),
 
                    CommMsgContents::SendPayload(send_payload_msg) => {
 
                        let getter =
 
                            self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming;
 
                        rctx.getter_push(getter, send_payload_msg);
 
                        *some_message_enqueued = true;
 
                        None
 
                    }
 
                }
 
            }
 
        }
 
        use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use};
 
        ///////////////////////////////////////////
 
        let mut some_message_enqueued = false;
 
        // try yield undelayed net message
 
        while let Some((net_index, msg)) = self.undelayed_messages.pop() {
 
            if let Some((net_index, msg)) =
 
                self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued)
 
            {
 
                return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
            }
 
        }
 
        loop {
 
            // try receive a net message
 
            while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? {
 
                if let Some((net_index, msg)) = self.handle_msg(
 
                    cu,
 
                    rctx,
 
                    net_index,
 
                    msg,
 
                    round_index,
 
                    &mut some_message_enqueued,
 
                ) {
 
                    return Ok(CommRecvOk::NewControlMsg { net_index, msg });
 
                }
 
            }
 
            // try receive a udp message
 
            let recv_buffer = self.udp_in_buffer.as_mut_slice();
 
            while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() {
 
                let ee = &mut self.udp_endpoint_store.endpoint_exts[index];
 
                if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() {
 
                    // I received a payload!
 
                    self.udp_endpoint_store.polled_undrained.insert(index);
 
                    if !ee.received_this_round {
 
                        let payload = Payload::from(&recv_buffer[..bytes_written]);
 
                        let port_spec_var = rctx.current_state.spec_var_for(ee.getter_for_incoming);
 
                        let port_spec_var = rctx.ips.spec_var_for(ee.getter_for_incoming);
 
                        let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING);
 
                        rctx.getter_push(
 
                            ee.getter_for_incoming,
 
                            SendPayloadMsg { payload, predicate },
 
                        );
 
                        some_message_enqueued = true;
 
                        ee.received_this_round = true;
 
                    } else {
 
                        // lose the message!
 
                    }
 
                }
 
            }
 
            if some_message_enqueued {
 
                return Ok(CommRecvOk::NewPayloadMsgs);
 
            }
 
            // poll if time remains
 
            match self.poll_and_populate(cu.logger(), &rctx.deadline) {
 
                Ok(()) => {} // continue looping
 
                Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew),
 
                Err(Pape::PollFailed) => return Err(Use::PollFailed),
 
            }
 
        }
 
    }
 
    fn try_recv_undrained_net(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
    ) -> Result<Option<(usize, Msg)>, TryRecvAnyNetError> {
 
        while let Some(index) = self.net_endpoint_store.polled_undrained.pop() {
 
            let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint;
 
            if let Some(msg) = net_endpoint
 
                .try_recv(logger)
 
                .map_err(|error| TryRecvAnyNetError { error, index })?
 
            {
 
                log!(@ENDPT, logger, "RECV polled_undrained {:?}", &msg);
 
                if !net_endpoint.inbox.is_empty() {
 
                    // there may be another message waiting!
 
                    self.net_endpoint_store.polled_undrained.insert(index);
 
                }
 
                return Ok(Some((index, msg)));
 
            }
 
        }
 
        Ok(None)
 
    }
 
    fn poll_and_populate(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        deadline: &Option<Instant>,
 
    ) -> Result<(), PollAndPopulateError> {
src/runtime/mod.rs
Show inline comments
 
@@ -6,793 +6,877 @@ pub mod error;
 
/// cbindgen:ignore
 
mod logging;
 
/// cbindgen:ignore
 
mod setup;
 

	
 
#[cfg(test)]
 
mod tests;
 

	
 
use crate::common::*;
 
use error::*;
 
use mio::net::UdpSocket;
 

	
 
/// The interface between the user's application and a communication session,
 
/// in which the application plays the part of a (native) component. This structure provides the application
 
/// with functionality available to all components: the ability to add new channels (port pairs), and to
 
/// instantiate new components whose definitions are defined in the connector's configured protocol
 
/// description. Native components have the additional ability to add `dangling' ports backed by local/remote
 
/// IP addresses, to be coupled with a counterpart once the connector's setup is completed by `connect`.
 
/// This allows sets of applications to cooperate in constructing shared sessions that span the network.
 
#[derive(Debug)]
 
pub struct Connector {
 
    unphased: ConnectorUnphased,
 
    phased: ConnectorPhased,
 
}
 

	
 
/// Characterizes a type which can write lines of logging text.
 
/// The implementations provided in the `logging` module are likely to be sufficient,
 
/// but for added flexibility, users are able to implement their own loggers for use
 
/// by connectors.
 
pub trait Logger: Debug + Send + Sync {
 
    fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>;
 
}
 

	
 
/// A logger that appends the logged strings to a growing byte buffer
 
#[derive(Debug)]
 
pub struct VecLogger(ConnectorId, Vec<u8>);
 

	
 
/// A trivial logger that always returns None, such that no logging information is ever written.
 
#[derive(Debug)]
 
pub struct DummyLogger;
 

	
 
/// A logger that writes the logged lines to a given file.
 
#[derive(Debug)]
 
pub struct FileLogger(ConnectorId, std::fs::File);
 

	
 
// Interface between protocol state and the connector runtime BEFORE all components
 
// ave begun their branching speculation. See ComponentState::nonsync_run.
 
pub(crate) struct NonsyncProtoContext<'a> {
 
    current_state: &'a mut CurrentState,
 
    ips: &'a mut IdAndPortState,
 
    logger: &'a mut dyn Logger,
 
    unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase
 
    proto_component_id: ComponentId,                              // KEY in id->component map
 
}
 

	
 
// Interface between protocol state and the connector runtime AFTER all components
 
// have begun their branching speculation. See ComponentState::sync_run.
 
pub(crate) struct SyncProtoContext<'a> {
 
    rctx: &'a RoundCtx,
 
    branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch
 
    predicate: &'a Predicate,                        // KEY in pred->branch map
 
}
 

	
 
// The data coupled with a particular protocol component branch, but crucially omitting
 
// the `ComponentState` such that this may be passed by reference to the state with separate
 
// access control.
 
#[derive(Default, Debug, Clone)]
 
struct ProtoComponentBranchInner {
 
    untaken_choice: Option<u16>,
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
}
 

	
 
// A speculative variable that lives for the duration of the synchronous round.
 
// Each is assigned a value in domain `SpecVal`.
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVar(PortId);
 

	
 
// The codomain of SpecVal. Has two associated constants for values FIRING and SILENT,
 
// but may also enumerate many more values to facilitate finer-grained nondeterministic branching.
 
#[derive(
 
    Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize,
 
)]
 
struct SpecVal(u16);
 

	
 
// Data associated with a successful synchronous round, retained afterwards such that the
 
// native component can freely reflect on how it went, reading the messages received at their
 
// inputs, and reflecting on which of their connector's synchronous batches succeeded.
 
#[derive(Debug)]
 
struct RoundEndedNative {
 
    batch_index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
}
 

	
 
// Implementation of a set in terms of a vector (optimized for reading, not writing)
 
#[derive(Default)]
 
struct VecSet<T: std::cmp::Ord> {
 
    // invariant: ordered, deduplicated
 
    vec: Vec<T>,
 
}
 

	
 
// Allows a connector to remember how to forward payloads towards the component that
 
// owns their destination port. `LocalComponent` corresponds with messages for components
 
// managed by the connector itself (hinting for it to look it up in a local structure),
 
// whereas the other variants direct the connector to forward the messages over the network.
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum Route {
 
    LocalComponent,
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
    owner: ComponentId,
 
}
 
// The outcome of a synchronous round, representing the distributed consensus.
 
// In the success case, the attached predicate encodes a row in the session's trace table.
 
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
 
enum Decision {
 
    Failure,
 
    Failure, // some connector timed out!
 
    Success(Predicate),
 
}
 

	
 
// The type of control messages exchanged between connectors over the network
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum Msg {
 
    SetupMsg(SetupMsg),
 
    CommMsg(CommMsg),
 
}
 

	
 
// Control messages exchanged during the setup phase only
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum SetupMsg {
 
    MyPortInfo(MyPortInfo),
 
    LeaderWave { wave_leader: ConnectorId },
 
    LeaderAnnounce { tree_leader: ConnectorId },
 
    YouAreMyParent,
 
    SessionGather { unoptimized_map: HashMap<ConnectorId, SessionInfo> },
 
    SessionScatter { optimized_map: HashMap<ConnectorId, SessionInfo> },
 
}
 

	
 
// A data structure encoding the state of a connector, passed around
 
// during the session optimization procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SessionInfo {
 
    serde_proto_description: SerdeProtocolDescription,
 
    port_info: HashMap<PortId, PortInfo>,
 
    endpoint_incoming_to_getter: Vec<PortId>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
}
 

	
 
// Newtype wrapper for an Arc<ProtocolDescription>,
 
// such that it can be (de)serialized for transmission over the network.
 
#[derive(Debug, Clone)]
 
struct SerdeProtocolDescription(Arc<ProtocolDescription>);
 

	
 
// Control message particular to the communication phase.
 
// as such, it's annotated with a round_index
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct CommMsg {
 
    round_index: usize,
 
    contents: CommMsgContents,
 
}
 

	
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommMsgContents {
 
    SendPayload(SendPayloadMsg),
 
    CommCtrl(CommCtrlMsg),
 
}
 

	
 
// Connector <-> connector control messages for use in the communication phase
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
enum CommCtrlMsg {
 
    Suggest { suggestion: Decision }, // SINKWARD
 
    Announce { decision: Decision },  // SINKAWAYS
 
    Suggest { suggestion: Decision }, // child->parent
 
    Announce { decision: Decision },  // parent->child
 
}
 

	
 
// Speculative payload message, communicating the value for the given
 
// port's message predecated on the given speculative variable assignments.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct SendPayloadMsg {
 
    predicate: Predicate,
 
    payload: Payload,
 
}
 

	
 
// Return result of `Predicate::assignment_union`, communicating the contents
 
// of the predicate which represents the (consistent) union of their mappings,
 
// if it exists (no variable mapped distinctly by the input predicates)
 
#[derive(Debug, PartialEq)]
 
enum AssignmentUnionResult {
 
    FormerNotLatter,
 
    LatterNotFormer,
 
    Equivalent,
 
    New(Predicate),
 
    Nonexistant,
 
}
 

	
 
// One of two endpoints for a control channel with a connector on either end.
 
// The underlying transport is TCP, so we use an inbox buffer to allow
 
// discrete payload receipt.
 
struct NetEndpoint {
 
    inbox: Vec<u8>,
 
    stream: TcpStream,
 
}
 

	
 
// Datastructure used during the setup phase representing a NetEndpoint TO BE SETUP
 
#[derive(Debug, Clone)]
 
struct NetEndpointSetup {
 
    getter_for_incoming: PortId,
 
    sock_addr: SocketAddr,
 
    endpoint_polarity: EndpointPolarity,
 
}
 

	
 
// Datastructure used during the setup phase representing a UdpEndpoint TO BE SETUP
 
#[derive(Debug, Clone)]
 
struct UdpEndpointSetup {
 
    getter_for_incoming: PortId,
 
    local_addr: SocketAddr,
 
    peer_addr: SocketAddr,
 
}
 

	
 
// NetEndpoint annotated with the ID of the port that receives payload
 
// messages received through the endpoint. This approach assumes that NetEndpoints
 
// DO NOT multiplex port->port channels, and so a mapping such as this is possible.
 
// As a result, the messages themselves don't need to carry the PortID with them.
 
#[derive(Debug)]
 
struct NetEndpointExt {
 
    net_endpoint: NetEndpoint,
 
    getter_for_incoming: PortId,
 
}
 

	
 
// Endpoint for a "raw" UDP endpoint. Corresponds to the "Udp Mediator Component"
 
// described in the literature.
 
// It acts as an endpoint by receiving messages via the poller etc. (managed by EndpointManager),
 
// It acts as a native component by managing a (speculative) set of payload messages (an outbox,
 
//  protecting the peer on the other side of the network).
 
#[derive(Debug)]
 
struct UdpEndpointExt {
 
    sock: UdpSocket, // already bound and connected
 
    received_this_round: bool,
 
    outgoing_payloads: HashMap<Predicate, Payload>,
 
    getter_for_incoming: PortId,
 
}
 

	
 
// Meta-data for the connector: its role in the consensus tree.
 
#[derive(Debug)]
 
struct Neighborhood {
 
    parent: Option<usize>,
 
    children: VecSet<usize>,
 
}
 

	
 
// Manages the connector's ID, and manages allocations for connector/port IDs.
 
#[derive(Debug, Clone)]
 
struct IdManager {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
    component_suffix_stream: U32Stream,
 
}
 

	
 
// Newtype wrapper around a byte buffer, used for UDP mediators to receive incoming datagrams.
 
struct UdpInBuffer {
 
    byte_vec: Vec<u8>,
 
}
 

	
 
// A generator of speculative variables. Created on-demand during the synchronous round
 
// by the IdManager.
 
#[derive(Debug)]
 
struct SpecVarStream {
 
    connector_id: ConnectorId,
 
    port_suffix_stream: U32Stream,
 
}
 

	
 
// Manages the messy state of the various endpoints, pollers, buffers, etc.
 
#[derive(Debug)]
 
struct EndpointManager {
 
    // invariants:
 
    // 1. net and udp endpoints are registered with poll. Poll token computed with TargetToken::into
 
    // 1. net and udp endpoints are registered with poll with tokens computed with TargetToken::into
 
    // 2. Events is empty
 
    poll: Poll,
 
    events: Events,
 
    delayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>,
 
    undelayed_messages: Vec<(usize, Msg)>, // ready to yield
 
    net_endpoint_store: EndpointStore<NetEndpointExt>,
 
    udp_endpoint_store: EndpointStore<UdpEndpointExt>,
 
    udp_in_buffer: UdpInBuffer,
 
}
 

	
 
// A storage of endpoints, which keeps track of which components have raised
 
// an event during poll(), signifying that they need to be checked for new incoming data
 
#[derive(Debug)]
 
struct EndpointStore<T> {
 
    endpoint_exts: Vec<T>,
 
    polled_undrained: VecSet<usize>,
 
}
 

	
 
// The information associated with a port identifier, designed for local storage.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct PortInfo {
 
    owner: ComponentId,
 
    peer: Option<PortId>,
 
    polarity: Polarity,
 
    route: Route,
 
}
 

	
 
// Similar to `PortInfo`, but designed for communication during the setup procedure.
 
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 
struct MyPortInfo {
 
    polarity: Polarity,
 
    port: PortId,
 
    owner: ComponentId,
 
}
 

	
 
// A convenient substructure for containing port info and the ID manager.
 
// Houses the bulk of the connector's persistent state between rounds.
 
// It turns out several situations require access to both things.
 
#[derive(Debug, Clone)]
 
struct CurrentState {
 
struct IdAndPortState {
 
    port_info: HashMap<PortId, PortInfo>,
 
    id_manager: IdManager,
 
}
 

	
 
// A component's setup-phase-specific data
 
#[derive(Debug)]
 
struct ConnectorCommunication {
 
    round_index: usize,
 
    endpoint_manager: EndpointManager,
 
    neighborhood: Neighborhood,
 
    native_batches: Vec<NativeBatch>,
 
    round_result: Result<Option<RoundEndedNative>, SyncError>,
 
}
 

	
 
// A component's data common to both setup and communication phases
 
#[derive(Debug)]
 
struct ConnectorUnphased {
 
    proto_description: Arc<ProtocolDescription>,
 
    proto_components: HashMap<ComponentId, ComponentState>,
 
    logger: Box<dyn Logger>,
 
    current_state: CurrentState,
 
    ips: IdAndPortState,
 
    native_component_id: ComponentId,
 
}
 

	
 
// A connector's phase-specific data
 
#[derive(Debug)]
 
enum ConnectorPhased {
 
    Setup(Box<ConnectorSetup>),
 
    Communication(Box<ConnectorCommunication>),
 
}
 

	
 
// A connector's setup-phase-specific data
 
#[derive(Debug)]
 
struct ConnectorSetup {
 
    net_endpoint_setups: Vec<NetEndpointSetup>,
 
    udp_endpoint_setups: Vec<UdpEndpointSetup>,
 
}
 

	
 
// A newtype wrapper for a map from speculative variable to speculative value
 
// A missing mapping corresponds with "unspecified".
 
#[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
struct Predicate {
 
    assigned: BTreeMap<SpecVar, SpecVal>,
 
}
 

	
 
// Identifies a child of this connector in the _solution tree_.
 
// Each connector creates its own local solutions for the consensus procedure during `sync`,
 
// from the solutions of its children. Those children are either locally-managed components,
 
// (which are leaves in the solution tree), or other connectors reachable through the given
 
// network endpoint (which are internal nodes in the solution tree).
 
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
 
enum SubtreeId {
 
    LocalComponent(ComponentId),
 
    NetEndpoint { index: usize },
 
}
 

	
 
// An accumulation of the connector's knowledge of all (a) the local solutions its children
 
// in the solution tree have found, and (b) its own solutions derivable from those of its children.
 
// This structure starts off each round with an empty set, and accumulates solutions as they are found
 
// by local components, or received over the network in control messages.
 
// IMPORTANT: solutions, once found, don't go away until the end of the round. That is to
 
// say that these sets GROW until the round is over, and all solutions are reset.
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    // invariant: old_local U new_local solutions are those that can be created from
 
    // the UNION of one element from each set in `subtree_solution`.
 
    // invariant is maintained by potentially populating new_local whenever subtree_solutions is populated.
 
    old_local: HashSet<Predicate>, // already sent to this connector's parent OR decided
 
    new_local: HashSet<Predicate>, // not yet sent to this connector's parent OR decided
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 

	
 
// Stores the transient data of a synchronous round.
 
// Some of it is for bookkeeping, and the rest is a temporary mirror of fields of
 
// `ConnectorUnphased`, such that any changes are safely contained within RoundCtx,
 
// and can be undone if the round fails.
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    payload_inbox: Vec<(PortId, SendPayloadMsg)>,
 
    deadline: Option<Instant>,
 
    current_state: CurrentState,
 
    ips: IdAndPortState,
 
}
 

	
 
// A trait intended to limit the access of the ConnectorUnphased structure
 
// such that we don't accidentally modify any important component/port data
 
// while the results of the round are undecided. Why? Any actions during Connector::sync
 
// are _speculative_ until the round is decided, and we need a safe way of rolling
 
// back any changes.
 
trait CuUndecided {
 
    fn logger(&mut self) -> &mut dyn Logger;
 
    fn proto_description(&self) -> &ProtocolDescription;
 
    fn native_component_id(&self) -> ComponentId;
 
    fn logger_and_protocol_description(&mut self) -> (&mut dyn Logger, &ProtocolDescription);
 
}
 

	
 
// Represents a set of synchronous port operations that the native component
 
// has described as an "option" for completing during the synchronous rounds.
 
// Operations contained here succeed together or not at all.
 
// A native with N=2+ batches are expressing an N-way nondeterministic choice
 
#[derive(Debug, Default)]
 
struct NativeBatch {
 
    // invariant: putters' and getters' polarities respected
 
    to_put: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 

	
 
// Parallels a mio::Token type, but more clearly communicates
 
// the way it identifies the evented structre it corresponds to.
 
// See runtime/setup for methods converting between TokenTarget and mio::Token
 
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
 
enum TokenTarget {
 
    NetEndpoint { index: usize },
 
    UdpEndpoint { index: usize },
 
    Waker,
 
}
 

	
 
// Returned by the endpoint manager as a result of comm_recv, telling the connector what happened,
 
// such that it can know when to continue polling, and when to block.
 
enum CommRecvOk {
 
    TimeoutWithoutNew,
 
    NewPayloadMsgs,
 
    NewControlMsg { net_index: usize, msg: CommCtrlMsg },
 
}
 
////////////////
 
fn err_would_block(err: &std::io::Error) -> bool {
 
    err.kind() == std::io::ErrorKind::WouldBlock
 
}
 
impl<T: std::cmp::Ord> VecSet<T> {
 
    fn new(mut vec: Vec<T>) -> Self {
 
        // establish the invariant
 
        vec.sort();
 
        vec.dedup();
 
        Self { vec }
 
    }
 
    fn contains(&self, element: &T) -> bool {
 
        self.vec.binary_search(element).is_ok()
 
    }
 
    // Insert the given element. Returns whether it was already present.
 
    fn insert(&mut self, element: T) -> bool {
 
        match self.vec.binary_search(&element) {
 
            Ok(_) => false,
 
            Err(index) => {
 
                self.vec.insert(index, element);
 
                true
 
            }
 
        }
 
    }
 
    fn iter(&self) -> std::slice::Iter<T> {
 
        self.vec.iter()
 
    }
 
    fn pop(&mut self) -> Option<T> {
 
        self.vec.pop()
 
    }
 
}
 
impl CurrentState {
 
impl IdAndPortState {
 
    fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator<Item = &PortId> {
 
        self.port_info
 
            .iter()
 
            .filter(move |(_, port_info)| port_info.owner == owner)
 
            .map(|(port, _)| port)
 
    }
 
    fn spec_var_for(&self, port: PortId) -> SpecVar {
 
        // Every port maps to a speculative variable
 
        // Two distinct ports map to the same variable
 
        // IFF they are two ends of the same logical channel.
 
        let info = self.port_info.get(&port).unwrap();
 
        SpecVar(match info.polarity {
 
            Getter => port,
 
            Putter => info.peer.unwrap(),
 
        })
 
    }
 
}
 
impl SpecVarStream {
 
    fn next(&mut self) -> SpecVar {
 
        let phantom_port: PortId =
 
            Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }
 
                .into();
 
        SpecVar(phantom_port)
 
    }
 
}
 
impl IdManager {
 
    fn new(connector_id: ConnectorId) -> Self {
 
        Self {
 
            connector_id,
 
            port_suffix_stream: Default::default(),
 
            component_suffix_stream: Default::default(),
 
        }
 
    }
 
    fn new_spec_var_stream(&self) -> SpecVarStream {
 
        // Spec var stream starts where the current port_id stream ends, with gap of SKIP_N.
 
        // This gap is entirely unnecessary (i.e. 0 is fine)
 
        // It's purpose is only to make SpecVars easier to spot in logs.
 
        // E.g. spot the spec var: { v0_0, v1_2, v1_103 }
 
        const SKIP_N: u32 = 100;
 
        let port_suffix_stream = self.port_suffix_stream.clone().n_skipped(SKIP_N);
 
        SpecVarStream { connector_id: self.connector_id, port_suffix_stream }
 
    }
 
    fn new_port_id(&mut self) -> PortId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into()
 
    }
 
    fn new_component_id(&mut self) -> ComponentId {
 
        Id { connector_id: self.connector_id, u32_suffix: self.component_suffix_stream.next() }
 
            .into()
 
    }
 
}
 
impl Drop for Connector {
 
    fn drop(&mut self) {
 
        log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!");
 
    }
 
}
 

	
 
// Given a slice of ports, return the first, if any, port is present repeatedly
 
fn duplicate_port(slice: &[PortId]) -> Option<PortId> {
 
    let mut vec = Vec::with_capacity(slice.len());
 
    for port in slice.iter() {
 
        match vec.binary_search(port) {
 
            Err(index) => vec.insert(index, *port),
 
            Ok(_) => return Some(*port),
 
        }
 
    }
 
    None
 
}
 
impl Connector {
 
    /// Generate a random connector identifier from the system's source of randomness.
 
    pub fn random_id() -> ConnectorId {
 
        type Bytes8 = [u8; std::mem::size_of::<ConnectorId>()];
 
        unsafe {
 
            let mut bytes = std::mem::MaybeUninit::<Bytes8>::uninit();
 
            // getrandom is the canonical crate for a small, secure rng
 
            getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap();
 
            // safe! representations of all valid Byte8 values are valid ConnectorId values
 
            std::mem::transmute::<_, _>(bytes.assume_init())
 
        }
 
    }
 

	
 
    /// Returns true iff the connector is in connected state, i.e., it's setup phase is complete,
 
    /// and it is ready to participate in synchronous rounds of communication.
 
    pub fn is_connected(&self) -> bool {
 
        // If designed for Rust usage, connectors would be exposed as an enum type from the start.
 
        // consequently, this "phased" business would also include connector variants and this would
 
        // get a lot closer to the connector impl. itself.
 
        // Instead, the C-oriented implementation doesn't distinguish connector states as types,
 
        // and distinguish them as enum variants instead
 
        match self.phased {
 
            ConnectorPhased::Setup(..) => false,
 
            ConnectorPhased::Communication(..) => true,
 
        }
 
    }
 

	
 
    /// Enables the connector's current logger to be swapped out for another
 
    pub fn swap_logger(&mut self, mut new_logger: Box<dyn Logger>) -> Box<dyn Logger> {
 
        std::mem::swap(&mut self.unphased.logger, &mut new_logger);
 
        new_logger
 
    }
 

	
 
    /// Access the connector's current logger
 
    pub fn get_logger(&mut self) -> &mut dyn Logger {
 
        &mut *self.unphased.logger
 
    }
 

	
 
    /// Create a new synchronous channel, returning its ends as a pair of ports,
 
    /// with polarity output, input respectively. Available during either setup/communication phase.
 
    /// # Panics
 
    /// This function panics if the connector's (large) port id space is exhausted.
 
    pub fn new_port_pair(&mut self) -> [PortId; 2] {
 
        let cu = &mut self.unphased;
 
        // adds two new associated ports, related to each other, and exposed to the native
 
        let mut new_cid = || cu.current_state.id_manager.new_port_id();
 
        let mut new_cid = || cu.ips.id_manager.new_port_id();
 
        // allocate two fresh port identifiers
 
        let [o, i] = [new_cid(), new_cid()];
 
        cu.current_state.port_info.insert(
 
        // store info for each:
 
        // - they are each others' peers
 
        // - they are owned by a local component with id `cid`
 
        // - polarity putter, getter respectively
 
        cu.ips.port_info.insert(
 
            o,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(i),
 
                owner: cu.native_component_id,
 
                polarity: Putter,
 
            },
 
        );
 
        cu.current_state.port_info.insert(
 
        cu.ips.port_info.insert(
 
            i,
 
            PortInfo {
 
                route: Route::LocalComponent,
 
                peer: Some(o),
 
                owner: cu.native_component_id,
 
                polarity: Getter,
 
            },
 
        );
 
        log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i);
 
        [o, i]
 
    }
 

	
 
    /// Instantiates a new component for the connector runtime to manage, and passing
 
    /// the given set of ports from the interface of the native component, to that of the
 
    /// newly created component (passing their ownership).
 
    /// # Errors
 
    /// Error is returned if the moved ports are not owned by the native component,
 
    /// if the given component name is not defined in the connector's protocol,
 
    /// the given sequence of ports contains a duplicate port,
 
    /// or if the component is unfit for instantiation with the given port sequence.
 
    /// # Panics
 
    /// This function panics if the connector's (large) component id space is exhausted.
 
    pub fn add_component(
 
        &mut self,
 
        identifier: &[u8],
 
        ports: &[PortId],
 
    ) -> Result<(), AddComponentError> {
 
        // called by the USER. moves ports owned by the NATIVE
 
        // Check for error cases first before modifying `cu`
 
        use AddComponentError as Ace;
 
        // 1. check if this is OK
 
        let cu = &self.unphased;
 
        if let Some(port) = duplicate_port(ports) {
 
            return Err(Ace::DuplicatePort(port));
 
        }
 
        let cu = &mut self.unphased;
 
        let expected_polarities = cu.proto_description.component_polarities(identifier)?;
 
        if expected_polarities.len() != ports.len() {
 
            return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() });
 
        }
 
        for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) {
 
            let info = cu.current_state.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            let info = cu.ips.port_info.get(&port).ok_or(Ace::UnknownPort(port))?;
 
            if info.owner != cu.native_component_id {
 
                return Err(Ace::UnknownPort(port));
 
            }
 
            if info.polarity != expected_polarity {
 
                return Err(Ace::WrongPortPolarity { port, expected_polarity });
 
            }
 
        }
 
        // 2. add new component
 
        let new_cid = cu.current_state.id_manager.new_component_id();
 
        // No errors! Time to modify `cu`
 
        // create a new component and identifier
 
        let cu = &mut self.unphased;
 
        let new_cid = cu.ips.id_manager.new_component_id();
 
        cu.proto_components
 
            .insert(new_cid, cu.proto_description.new_main_component(identifier, ports));
 
        // 3. update port ownership
 
        // update the ownership of moved ports
 
        for port in ports.iter() {
 
            match cu.current_state.port_info.get_mut(port) {
 
            match cu.ips.port_info.get_mut(port) {
 
                Some(port_info) => port_info.owner = new_cid,
 
                None => unreachable!(),
 
            }
 
        }
 
        Ok(())
 
    }
 
}
 
impl Predicate {
 
    #[inline]
 
    pub fn singleton(k: SpecVar, v: SpecVal) -> Self {
 
        Self::default().inserted(k, v)
 
    }
 
    #[inline]
 
    pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self {
 
        self.assigned.insert(k, v);
 
        self
 
    }
 

	
 
    // Return true whether `self` is a subset of `maybe_superset`
 
    pub fn assigns_subset(&self, maybe_superset: &Self) -> bool {
 
        for (var, val) in self.assigned.iter() {
 
            match maybe_superset.assigned.get(var) {
 
                Some(val2) if val2 == val => {}
 
                _ => return false, // var unmapped, or mapped differently
 
            }
 
        }
 
        // `maybe_superset` mirrored all my assignments!
 
        true
 
    }
 

	
 
    // returns true IFF self.unify would return Equivalent OR FormerNotLatter
 
    // pub fn consistent_with(&self, other: &Self) -> bool {
 
    //     let [larger, smaller] =
 
    //         if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] };
 

	
 
    //     for (var, val) in smaller.assigned.iter() {
 
    //         match larger.assigned.get(var) {
 
    //             Some(val2) if val2 != val => return false,
 
    //             _ => {}
 
    //         }
 
    //     }
 
    //     true
 
    // }
 

	
 
    /// Given self and other, two predicates, return the predicate whose
 
    /// assignments are the union of those of self and other.
 
    /// Given the two predicates {self, other}, return that whose
 
    /// assignments are the union of those of both.
 
    fn assignment_union(&self, other: &Self) -> AssignmentUnionResult {
 
        use AssignmentUnionResult as Aur;
 
        // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys.
 
        let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()];
 
        let [mut s, mut o] = [s_it.next(), o_it.next()];
 
        // lists of assignments in self but not other and vice versa.
 
        // populate lists of assignments in self but not other and vice versa.
 
        // do this by incrementally unfolding the iterators, keeping an eye
 
        // on the ordering between the head elements [s, o].
 
        // whenever s<o, other is certainly missing element 's', etc.
 
        let [mut s_not_o, mut o_not_s] = [vec![], vec![]];
 
        loop {
 
            match [s, o] {
 
                [None, None] => break,
 
                [None, None] => break, // both iterators are empty
 
                [None, Some(x)] => {
 
                    // self's iterator is empty.
 
                    // all remaning elements are in other but not self
 
                    o_not_s.push(x);
 
                    o_not_s.extend(o_it);
 
                    break;
 
                }
 
                [Some(x), None] => {
 
                    // other's iterator is empty.
 
                    // all remaning elements are in self but not other
 
                    s_not_o.push(x);
 
                    s_not_o.extend(s_it);
 
                    break;
 
                }
 
                [Some((sid, sb)), Some((oid, ob))] => {
 
                    if sid < oid {
 
                        // o is missing this element
 
                        s_not_o.push((sid, sb));
 
                        s = s_it.next();
 
                    } else if sid > oid {
 
                        // s is missing this element
 
                        o_not_s.push((oid, ob));
 
                        o = o_it.next();
 
                    } else if sb != ob {
 
                        assert_eq!(sid, oid);
 
                        // both predicates assign the variable but differ on the value
 
                        // No predicate exists which satisfies both!
 
                        return Aur::Nonexistant;
 
                    } else {
 
                        // both predicates assign the variable to the same value
 
                        s = s_it.next();
 
                        o = o_it.next();
 
                    }
 
                }
 
            }
 
        }
 
        // Observed zero inconsistencies. A unified predicate exists...
 
        match [s_not_o.is_empty(), o_not_s.is_empty()] {
 
            [true, true] => Aur::Equivalent,       // ... equivalent to both.
 
            [false, true] => Aur::FormerNotLatter, // ... equivalent to self.
 
            [true, false] => Aur::LatterNotFormer, // ... equivalent to other.
 
            [false, false] => {
 
                // ... which is the union of the predicates' assignments but
 
                //     is equivalent to neither self nor other.
 
                let mut new = self.clone();
 
                for (&id, &b) in o_not_s {
 
                    new.assigned.insert(id, b);
 
                }
 
                Aur::New(new)
 
            }
 
        }
 
    }
 

	
 
    // Compute the union of the assignments of the two given predicates, if it exists.
 
    // It doesn't exist if there is some value which the predicates assign to different values.
 
    pub(crate) fn union_with(&self, other: &Self) -> Option<Self> {
 
        let mut res = self.clone();
 
        for (&channel_id, &assignment_1) in other.assigned.iter() {
 
            match res.assigned.insert(channel_id, assignment_1) {
 
                Some(assignment_2) if assignment_1 != assignment_2 => return None,
 
                _ => {}
 
            }
 
        }
 
        Some(res)
 
    }
 
    pub(crate) fn query(&self, var: SpecVar) -> Option<SpecVal> {
 
        self.assigned.get(&var).copied()
 
    }
 
}
 

	
 
impl RoundCtx {
 
    // remove an arbitrary buffered message, along with the ID of the getter who receives it
 
    fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.payload_inbox.pop()
 
    }
 

	
 
    // buffer a message along with the ID of the getter who receives it
 
    fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.payload_inbox.push((getter, msg));
 
    }
 

	
 
    // buffer a message along with the ID of the putter who sent it
 
    fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(getter) = self.ips.port_info.get(&putter).unwrap().peer {
 
            log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_push(getter, msg);
 
        } else {
 
            log!(cu.logger(), "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
 

	
 
impl<T: Debug + std::cmp::Ord> Debug for VecSet<T> {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        f.debug_set().entries(self.vec.iter()).finish()
 
    }
 
}
 
impl Debug for Predicate {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        struct Assignment<'a>((&'a SpecVar, &'a SpecVal));
 
        impl Debug for Assignment<'_> {
 
            fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
                write!(f, "{:?}={:?}", (self.0).0, (self.0).1)
 
            }
 
        }
 
        f.debug_set().entries(self.assigned.iter().map(Assignment)).finish()
 
    }
 
}
 
impl serde::Serialize for SerdeProtocolDescription {
 
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
 
    where
 
        S: serde::Serializer,
 
    {
 
        let inner: &ProtocolDescription = &self.0;
 
        inner.serialize(serializer)
 
    }
 
}
 
impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription {
 
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
 
    where
 
        D: serde::Deserializer<'de>,
 
    {
 
        let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?;
 
        Ok(Self(Arc::new(inner)))
 
    }
 
}
 
impl IdParts for SpecVar {
 
    fn id_parts(self) -> (ConnectorId, U32Suffix) {
 
        self.0.id_parts()
 
    }
 
}
 
impl Debug for SpecVar {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        let (a, b) = self.id_parts();
 
        write!(f, "v{}_{}", a, b)
 
    }
 
}
 
impl SpecVal {
 
    const FIRING: Self = SpecVal(1);
 
    const SILENT: Self = SpecVal(0);
 
    fn is_firing(self) -> bool {
 
        self == Self::FIRING
 
        // all else treated as SILENT
 
    }
 
    fn iter_domain() -> impl Iterator<Item = Self> {
 
        (0..).map(SpecVal)
 
    }
 
}
 
impl Debug for SpecVal {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        self.0.fmt(f)
 
    }
 
}
 
impl Default for UdpInBuffer {
 
    fn default() -> Self {
 
        let mut byte_vec = Vec::with_capacity(Self::CAPACITY);
 
        unsafe {
 
            // safe! this vector is guaranteed to have sufficient capacity
 
            byte_vec.set_len(Self::CAPACITY);
 
        }
 
        Self { byte_vec }
 
    }
 
}
 
impl UdpInBuffer {
 
    const CAPACITY: usize = u16::MAX as usize;
 
    fn as_mut_slice(&mut self) -> &mut [u8] {
 
        self.byte_vec.as_mut_slice()
 
    }
 
}
 

	
 
impl Debug for UdpInBuffer {
 
    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
 
        write!(f, "UdpInBuffer")
 
    }
 
}
 

	
 
impl RoundCtx {
 
    fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> {
 
        self.payload_inbox.pop()
 
    }
 
    fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.payload_inbox.push((getter, msg));
 
    }
 
    fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) {
 
        if let Some(getter) = self.current_state.port_info.get(&putter).unwrap().peer {
 
            log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter);
 
            self.getter_push(getter, msg);
 
        } else {
 
            log!(cu.logger(), "Putter {:?} has no known peer!", putter);
 
            panic!("Putter {:?} has no known peer!");
 
        }
 
    }
 
}
src/runtime/setup.rs
Show inline comments
 
@@ -7,247 +7,247 @@ impl TokenTarget {
 
    const WAKER_TOKEN: usize = Self::MAX_INDEX;
 
}
 
impl From<Token> for TokenTarget {
 
    fn from(Token(index): Token) -> Self {
 
        if index == Self::WAKER_TOKEN {
 
            TokenTarget::Waker
 
        } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) {
 
            TokenTarget::UdpEndpoint { index: shifted }
 
        } else {
 
            TokenTarget::NetEndpoint { index }
 
        }
 
    }
 
}
 
impl Into<Token> for TokenTarget {
 
    fn into(self) -> Token {
 
        match self {
 
            TokenTarget::Waker => Token(Self::WAKER_TOKEN),
 
            TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX),
 
            TokenTarget::NetEndpoint { index } => Token(index),
 
        }
 
    }
 
}
 
impl Connector {
 
    /// Create a new connector structure with the given protocol description (via Arc to facilitate sharing).
 
    /// The resulting connector will start in the setup phase, and cannot be used for communication until the
 
    /// `connect` procedure completes.
 
    /// # Safety
 
    /// The correctness of the system's underlying distributed algorithms requires that no two
 
    /// connectors have the same ID. If the user does not know the identifiers of other connectors in the
 
    /// system, it is advised to guess it using Connector::random_id (relying on the exceptionally low probability of an error).
 
    /// Sessions with duplicate connector identifiers will not result in any memory unsafety, but cannot be guaranteed
 
    /// to preserve their configured protocols.
 
    /// Fortunately, in most realistic cases, the presence of duplicate connector identifiers will result in an
 
    /// error during `connect`, observed as a peer misbehaving.
 
    pub fn new(
 
        mut logger: Box<dyn Logger>,
 
        proto_description: Arc<ProtocolDescription>,
 
        connector_id: ConnectorId,
 
    ) -> Self {
 
        log!(&mut *logger, "Created with connector_id {:?}", connector_id);
 
        let mut id_manager = IdManager::new(connector_id);
 
        let native_component_id = id_manager.new_component_id();
 
        Self {
 
            unphased: ConnectorUnphased {
 
                proto_description,
 
                proto_components: Default::default(),
 
                logger,
 
                native_component_id,
 
                current_state: CurrentState { id_manager, port_info: Default::default() },
 
                ips: IdAndPortState { id_manager, port_info: Default::default() },
 
            },
 
            phased: ConnectorPhased::Setup(Box::new(ConnectorSetup {
 
                net_endpoint_setups: Default::default(),
 
                udp_endpoint_setups: Default::default(),
 
            })),
 
        }
 
    }
 
    /// Conceptually, this returning [p0, g1] is sugar for:
 
    /// 1. create port pair [p0, g0]
 
    /// 2. create port pair [p1, g1]
 
    /// 3. create udp component with interface of moved ports [p1, g0]
 
    /// 4. return [p0, g1]
 
    pub fn new_udp_mediator_component(
 
        &mut self,
 
        local_addr: SocketAddr,
 
        peer_addr: SocketAddr,
 
    ) -> Result<[PortId; 2], WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let udp_index = setup.udp_endpoint_setups.len();
 
                let udp_cid = cu.current_state.id_manager.new_component_id();
 
                let mut npid = || cu.current_state.id_manager.new_port_id();
 
                let udp_cid = cu.ips.id_manager.new_component_id();
 
                let mut npid = || cu.ips.id_manager.new_port_id();
 
                let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()];
 

	
 
                cu.current_state.port_info.insert(
 
                cu.ips.port_info.insert(
 
                    nin,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Getter,
 
                        peer: Some(uout),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.current_state.port_info.insert(
 
                cu.ips.port_info.insert(
 
                    nout,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: cu.native_component_id,
 
                    },
 
                );
 
                cu.current_state.port_info.insert(
 
                cu.ips.port_info.insert(
 
                    uin,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Getter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                cu.current_state.port_info.insert(
 
                cu.ips.port_info.insert(
 
                    uout,
 
                    PortInfo {
 
                        route: Route::UdpEndpoint { index: udp_index },
 
                        polarity: Putter,
 
                        peer: Some(uin),
 
                        owner: udp_cid,
 
                    },
 
                );
 
                setup.udp_endpoint_setups.push(UdpEndpointSetup {
 
                    local_addr,
 
                    peer_addr,
 
                    getter_for_incoming: nin,
 
                });
 
                Ok([nout, nin])
 
            }
 
        }
 
    }
 

	
 
    /// Adds a "dangling" port to the connector in the setup phase,
 
    /// to be formed into channel during the connect procedure with the given
 
    /// transport layer information.
 
    pub fn new_net_port(
 
        &mut self,
 
        polarity: Polarity,
 
        sock_addr: SocketAddr,
 
        endpoint_polarity: EndpointPolarity,
 
    ) -> Result<PortId, WrongStateError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Communication(..) => Err(WrongStateError),
 
            ConnectorPhased::Setup(setup) => {
 
                let new_pid = cu.current_state.id_manager.new_port_id();
 
                cu.current_state.port_info.insert(
 
                let new_pid = cu.ips.id_manager.new_port_id();
 
                cu.ips.port_info.insert(
 
                    new_pid,
 
                    PortInfo {
 
                        route: Route::LocalComponent,
 
                        peer: None,
 
                        owner: cu.native_component_id,
 
                        polarity,
 
                    },
 
                );
 
                log!(
 
                    cu.logger,
 
                    "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}",
 
                    new_pid,
 
                    polarity,
 
                    &sock_addr,
 
                    endpoint_polarity
 
                );
 
                setup.net_endpoint_setups.push(NetEndpointSetup {
 
                    sock_addr,
 
                    endpoint_polarity,
 
                    getter_for_incoming: new_pid,
 
                });
 
                Ok(new_pid)
 
            }
 
        }
 
    }
 

	
 
    /// Finalizes the connector's setup procedure and forms a distributed system with
 
    /// all other connectors reachable through network channels. This procedure represents
 
    /// a synchronization barrier, and upon successful return, the connector can no longer add new network ports,
 
    /// but is ready to begin the first communication round.
 
    /// Initially, the connector has a singleton set of _batches_, the only element of which is empty.
 
    /// This single element starts off selected. The selected batch is modified with `put` and `get`,
 
    /// and new batches are added and selected with `next_batch`. See `sync` for an explanation of the
 
    /// purpose of these batches.
 
    pub fn connect(&mut self, timeout: Option<Duration>) -> Result<(), ConnectError> {
 
        use ConnectError as Ce;
 
        let Self { unphased: cu, phased } = self;
 
        match &phased {
 
            ConnectorPhased::Communication { .. } => {
 
                log!(cu.logger, "Call to connecting in connected state");
 
                Err(Ce::AlreadyConnected)
 
            }
 
            ConnectorPhased::Setup(setup) => {
 
                log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout);
 
                let deadline = timeout.map(|to| Instant::now() + to);
 
                // connect all endpoints in parallel; send and receive peer ids through ports
 
                let mut endpoint_manager = new_endpoint_manager(
 
                    &mut *cu.logger,
 
                    &setup.net_endpoint_setups,
 
                    &setup.udp_endpoint_setups,
 
                    &mut cu.current_state.port_info,
 
                    &mut cu.ips.port_info,
 
                    &deadline,
 
                )?;
 
                log!(
 
                    cu.logger,
 
                    "Successfully connected {} endpoints. info now {:#?} {:#?}",
 
                    endpoint_manager.net_endpoint_store.endpoint_exts.len(),
 
                    &cu.current_state.port_info,
 
                    &cu.ips.port_info,
 
                    &endpoint_manager,
 
                );
 
                // leader election and tree construction
 
                let neighborhood = init_neighborhood(
 
                    cu.current_state.id_manager.connector_id,
 
                    cu.ips.id_manager.connector_id,
 
                    &mut *cu.logger,
 
                    &mut endpoint_manager,
 
                    &deadline,
 
                )?;
 
                log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood);
 
                let mut comm = ConnectorCommunication {
 
                    round_index: 0,
 
                    endpoint_manager,
 
                    neighborhood,
 
                    native_batches: vec![Default::default()],
 
                    round_result: Ok(None),
 
                };
 
                if cfg!(feature = "session_optimization") {
 
                    session_optimize(cu, &mut comm, &deadline)?;
 
                }
 
                log!(cu.logger, "connect() finished. setup phase complete");
 
                *phased = ConnectorPhased::Communication(Box::new(comm));
 
                Ok(())
 
            }
 
        }
 
    }
 
}
 
fn new_endpoint_manager(
 
    logger: &mut dyn Logger,
 
    net_endpoint_setups: &[NetEndpointSetup],
 
    udp_endpoint_setups: &[UdpEndpointSetup],
 
    port_info: &mut HashMap<PortId, PortInfo>,
 
    deadline: &Option<Instant>,
 
) -> Result<EndpointManager, ConnectError> {
 
    ////////////////////////////////////////////
 
    use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
 
    use ConnectError as Ce;
 
    const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
 
    const WAKER_PERIOD: Duration = Duration::from_millis(300);
 
    struct WakerState {
 
        continue_signal: AtomicBool,
 
        waker: mio::Waker,
 
    }
 
    impl WakerState {
 
        fn waker_loop(&self) {
 
            while self.continue_signal.load(SeqCst) {
 
                std::thread::sleep(WAKER_PERIOD);
 
                let _ = self.waker.wake();
 
            }
 
        }
 
        fn waker_stop(&self) {
 
            self.continue_signal.store(false, SeqCst);
 
            // TODO keep waker registered?
 
@@ -813,156 +813,154 @@ fn session_optimize(
 
        log!(
 
            cu.logger,
 
            "Session gather loop. awaiting info from children {:?}...",
 
            awaiting.iter()
 
        );
 
        let (recv_index, msg) =
 
            comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
        log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
        match msg {
 
            S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => {
 
                if !awaiting.remove(&recv_index) {
 
                    log!(
 
                        cu.logger,
 
                        "Wasn't expecting session info from {:?}. Got {:?}",
 
                        recv_index,
 
                        &child_unoptimized_map
 
                    );
 
                    return Err(Ce::SetupAlgMisbehavior);
 
                }
 
                unoptimized_map.extend(child_unoptimized_map.into_iter());
 
            }
 
            msg @ S(Sm::YouAreMyParent)
 
            | msg @ S(Sm::MyPortInfo(..))
 
            | msg @ S(Sm::LeaderAnnounce { .. })
 
            | msg @ S(Sm::LeaderWave { .. }) => {
 
                log!(cu.logger, "discarding old message {:?} during election", msg);
 
            }
 
            msg @ S(Sm::SessionScatter { .. }) => {
 
                log!(
 
                    cu.logger,
 
                    "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!",
 
                    recv_index,
 
                    &msg
 
                );
 
                return Err(Ce::SetupAlgMisbehavior);
 
            }
 
            msg @ Msg::CommMsg(..) => {
 
                log!(cu.logger, "delaying msg {:?} during session optimization", msg);
 
                comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
            }
 
        }
 
    }
 
    log!(
 
        cu.logger,
 
        "Gathered all children's maps. ConnectorId set is... {:?}",
 
        unoptimized_map.keys()
 
    );
 
    let my_session_info = SessionInfo {
 
        port_info: cu.current_state.port_info.clone(),
 
        port_info: cu.ips.port_info.clone(),
 
        proto_components: cu.proto_components.clone(),
 
        serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()),
 
        endpoint_incoming_to_getter: comm
 
            .endpoint_manager
 
            .net_endpoint_store
 
            .endpoint_exts
 
            .iter()
 
            .map(|ee| ee.getter_for_incoming)
 
            .collect(),
 
    };
 
    unoptimized_map.insert(cu.current_state.id_manager.connector_id, my_session_info);
 
    unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info);
 
    log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map);
 

	
 
    // acquire the optimized info...
 
    let optimized_map = if let Some(parent) = comm.neighborhood.parent {
 
        // ... as a message from my parent
 
        log!(cu.logger, "Forwarding gathered info to parent {:?}", parent);
 
        let msg = S(Sm::SessionGather { unoptimized_map });
 
        comm.endpoint_manager.send_to_setup(parent, &msg)?;
 
        'scatter_loop: loop {
 
            log!(
 
                cu.logger,
 
                "Session scatter recv loop. awaiting info from children {:?}...",
 
                awaiting.iter()
 
            );
 
            let (recv_index, msg) =
 
                comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?;
 
            log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg);
 
            match msg {
 
                S(Sm::SessionScatter { optimized_map }) => {
 
                    if recv_index != parent {
 
                        log!(cu.logger, "I expected the scatter from my parent only!");
 
                        return Err(Ce::SetupAlgMisbehavior);
 
                    }
 
                    break 'scatter_loop optimized_map;
 
                }
 
                msg @ Msg::CommMsg { .. } => {
 
                    log!(cu.logger, "delaying msg {:?} during scatter recv", msg);
 
                    comm.endpoint_manager.delayed_messages.push((recv_index, msg));
 
                }
 
                msg @ S(Sm::SessionGather { .. })
 
                | msg @ S(Sm::YouAreMyParent)
 
                | msg @ S(Sm::MyPortInfo(..))
 
                | msg @ S(Sm::LeaderAnnounce { .. })
 
                | msg @ S(Sm::LeaderWave { .. }) => {
 
                    log!(cu.logger, "discarding old message {:?} during election", msg);
 
                }
 
            }
 
        }
 
    } else {
 
        // by computing it myself
 
        log!(cu.logger, "I am the leader! I will optimize this session");
 
        leader_session_map_optimize(&mut *cu.logger, unoptimized_map)?
 
    };
 
    log!(
 
        cu.logger,
 
        "Optimized info map is {:?}. Sending to children {:?}",
 
        &optimized_map,
 
        comm.neighborhood.children.iter()
 
    );
 
    log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map);
 
    let optimized_info = optimized_map
 
        .get(&cu.current_state.id_manager.connector_id)
 
        .expect("HEY NO INFO FOR ME?")
 
        .clone();
 
    let optimized_info =
 
        optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone();
 
    let msg = S(Sm::SessionScatter { optimized_map });
 
    for &child in comm.neighborhood.children.iter() {
 
        comm.endpoint_manager.send_to_setup(child, &msg)?;
 
    }
 
    apply_optimizations(cu, comm, optimized_info)?;
 
    log!(cu.logger, "Session optimizations applied");
 
    Ok(())
 
}
 
fn leader_session_map_optimize(
 
    logger: &mut dyn Logger,
 
    unoptimized_map: HashMap<ConnectorId, SessionInfo>,
 
) -> Result<HashMap<ConnectorId, SessionInfo>, ConnectError> {
 
    log!(logger, "Session map optimize START");
 
    log!(logger, "Session map optimize END");
 
    Ok(unoptimized_map)
 
}
 
fn apply_optimizations(
 
    cu: &mut ConnectorUnphased,
 
    comm: &mut ConnectorCommunication,
 
    session_info: SessionInfo,
 
) -> Result<(), ConnectError> {
 
    let SessionInfo {
 
        proto_components,
 
        port_info,
 
        serde_proto_description,
 
        endpoint_incoming_to_getter,
 
    } = session_info;
 
    // TODO some info which should be read-only can be mutated with the current scheme
 
    cu.current_state.port_info = port_info;
 
    cu.ips.port_info = port_info;
 
    cu.proto_components = proto_components;
 
    cu.proto_description = serde_proto_description.0;
 
    for (ee, getter) in comm
 
        .endpoint_manager
 
        .net_endpoint_store
 
        .endpoint_exts
 
        .iter_mut()
 
        .zip(endpoint_incoming_to_getter)
 
    {
 
        ee.getter_for_incoming = getter;
 
    }
 
    Ok(())
 
}
0 comments (0 inline, 0 general)