Changeset - ebd352ab717e
[Not reviewed]
0 1 0
Christopher Esterhuyse - 5 years ago 2020-07-15 14:14:09
christopher.esterhuyse@gmail.com
smarter (safe) caching and reuse of temporary map structures during the sync round
1 file changed with 71 insertions and 25 deletions:
0 comments (0 inline, 0 general)
src/runtime/communication.rs
Show inline comments
 
use super::*;
 
use crate::common::*;
 
use core::ops::Deref;
 
use core::ops::DerefMut;
 

	
 
////////////////
 
// Guard protecting an incrementally unfoldable slice of MapTempGuard elements
 
struct MapTempsGuard<'a, K, V>(&'a mut [HashMap<K, V>]);
 
// Type protecting a temporary map; At the start and end of the Guard's lifetime, self.0.is_empty()
 
struct MapTempGuard<'a, K, V>(&'a mut HashMap<K, V>);
 

	
 
#[derive(Default)]
 
struct GetterBuffer {
 
    getters_and_sends: Vec<(PortId, SendPayloadMsg)>,
 
}
 
struct RoundCtx {
 
    solution_storage: SolutionStorage,
 
    spec_var_stream: SpecVarStream,
 
    getter_buffer: GetterBuffer,
 
    deadline: Option<Instant>,
 
}
 
struct BranchingNative {
 
    branches: HashMap<Predicate, NativeBranch>,
 
}
 
#[derive(Clone, Debug)]
 
struct NativeBranch {
 
    index: usize,
 
    gotten: HashMap<PortId, Payload>,
 
    to_get: HashSet<PortId>,
 
}
 
#[derive(Debug)]
 
struct SolutionStorage {
 
    old_local: HashSet<Predicate>,
 
    new_local: HashSet<Predicate>,
 
    // this pair acts as SubtreeId -> HashSet<Predicate> which is friendlier to iteration
 
    subtree_solutions: Vec<HashSet<Predicate>>,
 
    subtree_id_to_index: HashMap<SubtreeId, usize>,
 
}
 
#[derive(Debug)]
 
struct BranchingProtoComponent {
 
    ports: HashSet<PortId>,
 
    branches: HashMap<Predicate, ProtoComponentBranch>,
 
}
 
#[derive(Debug, Clone)]
 
struct ProtoComponentBranch {
 
    did_put_or_get: HashSet<PortId>,
 
    inbox: HashMap<PortId, Payload>,
 
    state: ComponentState,
 
    untaken_choice: Option<u16>,
 
    ended: bool,
 
}
 
struct CyclicDrainer<'a, K: Eq + Hash, V> {
 
    input: &'a mut HashMap<K, V>,
 
    inner: CyclicDrainInner<'a, K, V>,
 
}
 
struct CyclicDrainInner<'a, K: Eq + Hash, V> {
 
    swap: &'a mut HashMap<K, V>,
 
    output: &'a mut HashMap<K, V>,
 
}
 
trait ReplaceBoolTrue {
 
    fn replace_with_true(&mut self) -> bool;
 
}
 
impl ReplaceBoolTrue for bool {
 
    fn replace_with_true(&mut self) -> bool {
 
        let was = *self;
 
        *self = true;
 
        !was
 
    }
 
}
 

	
 
////////////////
 
impl<'a, K, V> MapTempsGuard<'a, K, V> {
 
    fn reborrow(&mut self) -> MapTempsGuard<'_, K, V> {
 
        MapTempsGuard(self.0)
 
    }
 
    fn split_first_mut(&mut self) -> (MapTempGuard<'_, K, V>, MapTempsGuard<'_, K, V>) {
 
        let (head, tail) = self.0.split_first_mut().expect("Cache exhausted");
 
        (MapTempGuard::new(head), MapTempsGuard(tail))
 
    }
 
}
 
impl<'a, K, V> MapTempGuard<'a, K, V> {
 
    fn new(map: &'a mut HashMap<K, V>) -> Self {
 
        map.clear();
 
        Self(map)
 
    }
 
}
 
impl<'a, K, V> Drop for MapTempGuard<'a, K, V> {
 
    fn drop(&mut self) {
 
        self.0.clear()
 
    }
 
}
 
impl<'a, K, V> Deref for MapTempGuard<'a, K, V> {
 
    type Target = HashMap<K, V>;
 
    fn deref(&self) -> &<Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> {
 
    fn deref_mut(&mut self) -> &mut <Self as Deref>::Target {
 
        self.0
 
    }
 
}
 
impl RoundCtxTrait for RoundCtx {
 
    fn get_deadline(&self) -> &Option<Instant> {
 
        &self.deadline
 
    }
 
    fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) {
 
        self.getter_buffer.getter_add(getter, msg)
 
    }
 
}
 
impl Connector {
 
    fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> {
 
        if let ConnectorPhased::Communication(comm) = &mut self.phased {
 
            Some(comm)
 
        } else {
 
            None
 
        }
 
    }
 
    // pub(crate) fn get_mut_udp_sock(&mut self, index: usize) -> Option<&mut UdpSocket> {
 
    //     let sock = &mut self
 
    //         .get_comm_mut()?
 
    //         .endpoint_manager
 
    //         .udp_endpoint_store
 
    //         .endpoint_exts
 
    //         .get_mut(index)?
 
    //         .sock;
 
    //     Some(sock)
 
    // }
 
    pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> {
 
        use GottenError as Ge;
 
        let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?;
 
        match &comm.round_result {
 
            Err(_) => Err(Ge::PreviousSyncFailed),
 
            Ok(None) => Err(Ge::NoPreviousRound),
 
            Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet),
 
        }
 
    }
 
    pub fn next_batch(&mut self) -> Result<usize, WrongStateError> {
 
        // returns index of new batch
 
        let comm = self.get_comm_mut().ok_or(WrongStateError)?;
 
        comm.native_batches.push(Default::default());
 
        Ok(comm.native_batches.len() - 1)
 
    }
 
    fn port_op_access(
 
        &mut self,
 
        port: PortId,
 
        expect_polarity: Polarity,
 
    ) -> Result<&mut NativeBatch, PortOpError> {
 
        use PortOpError as Poe;
 
        let Self { unphased, phased } = self;
 
        if !unphased.native_ports.contains(&port) {
 
            return Err(Poe::PortUnavailable);
 
        }
 
        match unphased.port_info.polarities.get(&port) {
 
            Some(p) if *p == expect_polarity => {}
 
            Some(_) => return Err(Poe::WrongPolarity),
 
            None => return Err(Poe::UnknownPolarity),
 
        }
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(Poe::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant
 
                Ok(batch)
 
            }
 
        }
 
    }
 
    pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Putter)?;
 
        if batch.to_put.contains_key(&port) {
 
            Err(Poe::MultipleOpsOnPort)
 
        } else {
 
            batch.to_put.insert(port, payload);
 
            Ok(())
 
        }
 
    }
 
    pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> {
 
        use PortOpError as Poe;
 
        let batch = self.port_op_access(port, Getter)?;
 
        if batch.to_get.insert(port) {
 
            Ok(())
 
        } else {
 
            Err(Poe::MultipleOpsOnPort)
 
        }
 
    }
 
    // entrypoint for caller. overwrites round result enum, and returns what happened
 
    pub fn sync(&mut self, timeout: Option<Duration>) -> Result<usize, SyncError> {
 
        let Self { unphased: cu, phased } = self;
 
        match phased {
 
            ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected),
 
            ConnectorPhased::Communication(comm) => {
 
                match &comm.round_result {
 
                    Err(SyncError::Unrecoverable(e)) => {
 
                        log!(cu.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e);
 
                        return Err(SyncError::Unrecoverable(e.clone()));
 
                    }
 
                    _ => {}
 
                }
 
@@ -314,475 +352,482 @@ impl Connector {
 
                rctx.solution_storage.submit_and_digest_subtree_solution(
 
                    &mut *cu.logger,
 
                    SubtreeId::LocalComponent(ComponentId::Native),
 
                    predicate.clone(),
 
                );
 
            }
 
            if let Some(_) = branching_native.branches.insert(predicate, branch) {
 
                // thanks to the native_branch_spec_var, each batch has a distinct predicate
 
                unreachable!()
 
            }
 
        }
 
        // restore the invariant: !native_batches.is_empty()
 
        comm.native_batches.push(Default::default());
 

	
 
        comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.logger, &mut rctx.spec_var_stream);
 
        // Call to another big method; keep running this round until a distributed decision is reached
 
        let decision = Self::sync_reach_decision(
 
            cu,
 
            comm,
 
            &mut branching_native,
 
            &mut branching_proto_components,
 
            &mut rctx,
 
        )?;
 
        log!(cu.logger, "Committing to decision {:?}!", &decision);
 
        comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger, &decision)?;
 

	
 
        // propagate the decision to children
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce {
 
                decision: decision.clone(),
 
            }),
 
        });
 
        log!(
 
            cu.logger,
 
            "Announcing decision {:?} through child endpoints {:?}",
 
            &msg,
 
            &comm.neighborhood.children
 
        );
 
        for &child in comm.neighborhood.children.iter() {
 
            comm.endpoint_manager.send_to_comms(child, &msg)?;
 
        }
 
        let ret = match decision {
 
            Decision::Failure => {
 
                // dropping {branching_proto_components, branching_native}
 
                Err(Se::RoundFailure)
 
            }
 
            Decision::Success(predicate) => {
 
                // commit changes to component states
 
                cu.proto_components.clear();
 
                cu.proto_components.extend(
 
                    // consume branching proto components
 
                    branching_proto_components
 
                        .into_iter()
 
                        .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))),
 
                );
 
                log!(
 
                    cu.logger,
 
                    "End round with (updated) component states {:?}",
 
                    cu.proto_components.keys()
 
                );
 
                // consume native
 
                Ok(Some(branching_native.collapse_with(&mut *cu.logger, &predicate)))
 
            }
 
        };
 
        log!(cu.logger, "Sync round ending! Cleaning up");
 
        // dropping {solution_storage, payloads_to_get}
 
        ret
 
    }
 

	
 
    fn sync_reach_decision(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        branching_native: &mut BranchingNative,
 
        branching_proto_components: &mut HashMap<ProtoComponentId, BranchingProtoComponent>,
 
        rctx: &mut RoundCtx,
 
    ) -> Result<Decision, UnrecoverableSyncError> {
 
        let mut already_requested_failure = false;
 
        if branching_native.branches.is_empty() {
 
            log!(cu.logger, "Native starts with no branches! Failure!");
 
            match comm.neighborhood.parent {
 
                Some(parent) => {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                }
 
                None => {
 
                    log!(cu.logger, "No parent. Deciding on failure");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "Done translating native batches into branches");
 

	
 
        let mut pcb_temps_owner = <[HashMap<Predicate, ProtoComponentBranch>; 3]>::default();
 
        let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner);
 
        let mut bn_temp_owner = <HashMap<Predicate, NativeBranch>>::default();
 

	
 
        // run all proto components to their sync blocker
 
        log!(
 
            cu.logger,
 
            "Running all {} proto components to their sync blocker...",
 
            branching_proto_components.len()
 
        );
 
        for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() {
 
            let BranchingProtoComponent { ports, branches } = proto_component;
 
            let mut swap = HashMap::default();
 
            let (swap, mut pcb_temps) = pcb_temps.split_first_mut();
 
            let (blocked, _pcb_temps) = pcb_temps.split_first_mut();
 
            // initially, no components have .ended==true
 
            let mut blocked = HashMap::default();
 
            // drain from branches --> blocked
 
            let cd = CyclicDrainer::new(branches, &mut swap, &mut blocked);
 
            let cd = CyclicDrainer::new(branches, swap.0, blocked.0);
 
            BranchingProtoComponent::drain_branches_to_blocked(
 
                cd,
 
                cu,
 
                rctx,
 
                proto_component_id,
 
                ports,
 
            )?;
 
            // swap the blocked branches back
 
            std::mem::swap(&mut blocked, branches);
 
            std::mem::swap(blocked.0, branches);
 
            if branches.is_empty() {
 
                log!(cu.logger, "{:?} has become inconsistent!", proto_component_id);
 
                if let Some(parent) = comm.neighborhood.parent {
 
                    if already_requested_failure.replace_with_true() {
 
                        Self::request_failure(cu, comm, parent)?
 
                    } else {
 
                        log!(cu.logger, "Already requested failure");
 
                    }
 
                } else {
 
                    log!(cu.logger, "As the leader, deciding on timeout");
 
                    return Ok(Decision::Failure);
 
                }
 
            }
 
        }
 
        log!(cu.logger, "All proto components are blocked");
 

	
 
        log!(cu.logger, "Entering decision loop...");
 
        comm.endpoint_manager.undelay_all();
 
        'undecided: loop {
 
            // drain payloads_to_get, sending them through endpoints / feeding them to components
 
            log!(cu.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len());
 
            while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() {
 
                assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
                let route = cu.port_info.routes.get(&getter);
 
                log!(
 
                    cu.logger,
 
                    "Routing msg {:?} to {:?} via {:?}",
 
                    &send_payload_msg,
 
                    getter,
 
                    &route
 
                );
 
                match route {
 
                    None => log!(cu.logger, "Delivery failed. Physical route unmapped!"),
 
                    Some(Route::UdpEndpoint { index }) => {
 
                        let udp_endpoint_ext =
 
                            &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index];
 
                        let SendPayloadMsg { predicate, payload } = send_payload_msg;
 
                        log!(cu.logger, "Delivering to udp endpoint index={}", index);
 
                        udp_endpoint_ext.outgoing_payloads.insert(predicate, payload);
 
                    }
 
                    Some(Route::NetEndpoint { index }) => {
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::SendPayload(send_payload_msg),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(*index, &msg)?;
 
                    }
 
                    Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg(
 
                        cu,
 
                        &mut rctx.solution_storage,
 
                        rctx,
 
                        getter,
 
                        &send_payload_msg,
 
                        MapTempGuard::new(&mut bn_temp_owner),
 
                    ),
 
                    Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => {
 
                        if let Some(branching_component) =
 
                            branching_proto_components.get_mut(proto_component_id)
 
                        {
 
                            let proto_component_id = *proto_component_id;
 
                            branching_component.feed_msg(
 
                                cu,
 
                                rctx,
 
                                proto_component_id,
 
                                getter,
 
                                &send_payload_msg,
 
                                pcb_temps.reborrow(),
 
                            )?;
 
                            if branching_component.branches.is_empty() {
 
                                log!(
 
                                    cu.logger,
 
                                    "{:?} has become inconsistent!",
 
                                    proto_component_id
 
                                );
 
                                if let Some(parent) = comm.neighborhood.parent {
 
                                    if already_requested_failure.replace_with_true() {
 
                                        Self::request_failure(cu, comm, parent)?
 
                                    } else {
 
                                        log!(cu.logger, "Already requested failure");
 
                                    }
 
                                } else {
 
                                    log!(cu.logger, "As the leader, deciding on timeout");
 
                                    return Ok(Decision::Failure);
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Delivery to getter {:?} msg {:?} failed because {:?} isn't here",
 
                                getter,
 
                                &send_payload_msg,
 
                                proto_component_id
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            // check if we have a solution yet
 
            log!(cu.logger, "Check if we have any local decisions...");
 
            for solution in rctx.solution_storage.iter_new_local_make_old() {
 
                log!(cu.logger, "New local decision with solution {:?}...", &solution);
 
                match comm.neighborhood.parent {
 
                    Some(parent) => {
 
                        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
                        let suggestion = Decision::Success(solution);
 
                        let msg = Msg::CommMsg(CommMsg {
 
                            round_index: comm.round_index,
 
                            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest {
 
                                suggestion,
 
                            }),
 
                        });
 
                        comm.endpoint_manager.send_to_comms(parent, &msg)?;
 
                    }
 
                    None => {
 
                        log!(cu.logger, "No parent. Deciding on solution {:?}", &solution);
 
                        return Ok(Decision::Success(solution));
 
                    }
 
                }
 
            }
 

	
 
            // stuck! make progress by receiving a msg
 
            // try recv messages arriving through endpoints
 
            log!(cu.logger, "No decision yet. Let's recv an endpoint msg...");
 
            {
 
                let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) =
 
                    match comm.endpoint_manager.try_recv_any_comms(
 
                        &mut *cu.logger,
 
                        &cu.port_info,
 
                        rctx,
 
                        comm.round_index,
 
                    )? {
 
                        CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg),
 
                        CommRecvOk::NewPayloadMsgs => continue 'undecided,
 
                        CommRecvOk::TimeoutWithoutNew => {
 
                            log!(cu.logger, "Reached user-defined deadling without decision...");
 
                            if let Some(parent) = comm.neighborhood.parent {
 
                                if already_requested_failure.replace_with_true() {
 
                                    Self::request_failure(cu, comm, parent)?
 
                                } else {
 
                                    log!(cu.logger, "Already requested failure");
 
                                }
 
                            } else {
 
                                log!(cu.logger, "As the leader, deciding on timeout");
 
                                return Ok(Decision::Failure);
 
                            }
 
                            rctx.deadline = None;
 
                            continue 'undecided;
 
                        }
 
                    };
 
                log!(
 
                    cu.logger,
 
                    "Received from endpoint {} ctrl msg  {:?}",
 
                    net_index,
 
                    &comm_ctrl_msg
 
                );
 
                match comm_ctrl_msg {
 
                    CommCtrlMsg::Suggest { suggestion } => {
 
                        // only accept this control msg through a child endpoint
 
                        if comm.neighborhood.children.contains(&net_index) {
 
                            match suggestion {
 
                                Decision::Success(predicate) => {
 
                                    // child solution contributes to local solution
 
                                    log!(cu.logger, "Child provided solution {:?}", &predicate);
 
                                    let subtree_id = SubtreeId::NetEndpoint { index: net_index };
 
                                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                                        &mut *cu.logger,
 
                                        subtree_id,
 
                                        predicate,
 
                                    );
 
                                }
 
                                Decision::Failure => {
 
                                    match comm.neighborhood.parent {
 
                                        None => {
 
                                            log!(cu.logger, "I decide on my child's failure");
 
                                            break 'undecided Ok(Decision::Failure);
 
                                        }
 
                                        Some(parent) => {
 
                                            log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent);
 
                                            if already_requested_failure.replace_with_true() {
 
                                                Self::request_failure(cu, comm, parent)?
 
                                            } else {
 
                                                log!(cu.logger, "Already requested failure");
 
                                            }
 
                                        }
 
                                    }
 
                                }
 
                            }
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Discarding suggestion {:?} from non-child endpoint idx {:?}",
 
                                &suggestion,
 
                                net_index
 
                            );
 
                        }
 
                    }
 
                    CommCtrlMsg::Announce { decision } => {
 
                        if Some(net_index) == comm.neighborhood.parent {
 
                            // adopt this decision
 
                            return Ok(decision);
 
                        } else {
 
                            log!(
 
                                cu.logger,
 
                                "Discarding announcement {:?} from non-parent endpoint idx {:?}",
 
                                &decision,
 
                                net_index
 
                            );
 
                        }
 
                    }
 
                }
 
            }
 
            log!(cu.logger, "Endpoint msg recv done");
 
        }
 
    }
 
    fn request_failure(
 
        cu: &mut ConnectorUnphased,
 
        comm: &mut ConnectorCommunication,
 
        parent: usize,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        log!(cu.logger, "Forwarding to my parent {:?}", parent);
 
        let suggestion = Decision::Failure;
 
        let msg = Msg::CommMsg(CommMsg {
 
            round_index: comm.round_index,
 
            contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion }),
 
        });
 
        comm.endpoint_manager.send_to_comms(parent, &msg)
 
    }
 
}
 
impl NativeBranch {
 
    fn is_ended(&self) -> bool {
 
        self.to_get.is_empty()
 
    }
 
}
 
impl BranchingNative {
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        solution_storage: &mut SolutionStorage,
 
        round_ctx: &mut RoundCtx,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        bn_temp: MapTempGuard<'_, Predicate, NativeBranch>,
 
    ) {
 
        log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg);
 
        assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter));
 
        let mut draining = HashMap::default();
 
        let mut draining = bn_temp;
 
        let finished = &mut self.branches;
 
        std::mem::swap(&mut draining, finished);
 
        std::mem::swap(draining.0, finished);
 
        for (predicate, mut branch) in draining.drain() {
 
            log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate);
 
            // check if this branch expects to receive it
 
            let var = cu.port_info.spec_var_for(getter);
 
            let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| {
 
                branch.to_get.remove(&getter);
 
                let was = branch.gotten.insert(getter, send_payload_msg.payload.clone());
 
                assert!(was.is_none());
 
                if branch.is_ended() {
 
                    log!(
 
                        cu.logger,
 
                        "new native solution with {:?} is_ended() with gotten {:?}",
 
                        &predicate,
 
                        &branch.gotten
 
                    );
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Native);
 
                    solution_storage.submit_and_digest_subtree_solution(
 
                    round_ctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                } else {
 
                    log!(
 
                        cu.logger,
 
                        "Fed native {:?} still has to_get {:?}",
 
                        &predicate,
 
                        &branch.to_get
 
                    );
 
                }
 
            };
 
            if predicate.query(var) != Some(SpecVal::FIRING) {
 
                // optimization. Don't bother trying this branch
 
                log!(
 
                    cu.logger,
 
                    "skipping branch with {:?} that doesn't want the message (fastpath)",
 
                    &predicate
 
                );
 
                Self::insert_branch_merging(finished, predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(
 
                        cu.logger,
 
                        "skipping branch with {:?} that doesn't want the message (slowpath)",
 
                        &predicate
 
                    );
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    feed_branch(&mut branch, &predicate);
 
                    log!(cu.logger, "branch pred covers it! Accept the msg");
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.logger,
 
                        "payload pred {:?} covers branch pred {:?}",
 
                        &predicate2,
 
                        &predicate
 
                    );
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate2, branch2);
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    let mut branch2 = branch.clone();
 
                    feed_branch(&mut branch2, &predicate2);
 
                    log!(
 
                        cu.logger,
 
                        "new subsuming pred created {:?}. forking and feeding",
 
                        &predicate2
 
                    );
 
                    Self::insert_branch_merging(finished, predicate, branch);
 
                    Self::insert_branch_merging(finished, predicate2, branch2);
 
                }
 
            }
 
        }
 
    }
 
    fn insert_branch_merging(
 
        branches: &mut HashMap<Predicate, NativeBranch>,
 
        predicate: Predicate,
 
        mut branch: NativeBranch,
 
    ) {
 
        let e = branches.entry(predicate);
 
        use std::collections::hash_map::Entry;
 
        match e {
 
            Entry::Vacant(ev) => {
 
                // no existing branch present. We insert it no problem. (The most common case)
 
                ev.insert(branch);
 
            }
 
            Entry::Occupied(mut eo) => {
 
                // Oh dear, there is already a branch with this predicate.
 
                // Rather than choosing either branch, we MERGE them.
 
                // This means taking the UNION of their .gotten and the INTERSECTION of their .to_get
 
                let old = eo.get_mut();
 
                for (k, v) in branch.gotten.drain() {
 
                    if old.gotten.insert(k, v).is_none() {
 
                        // added a gotten element in `branch` not already in `old`
 
                        old.to_get.remove(&k);
 
                    }
 
                }
 
            }
 
        }
 
    }
 
    fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk {
 
@@ -817,297 +862,298 @@ impl BranchingProtoComponent {
 
        proto_component_id: ProtoComponentId,
 
        ports: &HashSet<PortId>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        cd.cyclic_drain(|mut predicate, mut branch, mut drainer| {
 
            let mut ctx = SyncProtoContext {
 
                untaken_choice: &mut branch.untaken_choice,
 
                logger: &mut *cu.logger,
 
                predicate: &predicate,
 
                port_info: &cu.port_info,
 
                inbox: &branch.inbox,
 
                did_put_or_get: &mut branch.did_put_or_get,
 
            };
 
            let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description);
 
            log!(
 
                cu.logger,
 
                "Proto component with id {:?} branch with pred {:?} hit blocker {:?}",
 
                proto_component_id,
 
                &predicate,
 
                &blocker,
 
            );
 
            use SyncBlocker as B;
 
            match blocker {
 
                B::NondetChoice { n } => {
 
                    let var = rctx.spec_var_stream.next();
 
                    for val in SpecVal::iter_domain().take(n as usize) {
 
                        let pred = predicate.clone().inserted(var, val);
 
                        let mut branch_n = branch.clone();
 
                        branch_n.untaken_choice = Some(val.0);
 
                        drainer.add_input(pred, branch_n);
 
                    }
 
                }
 
                B::Inconsistent => {
 
                    // EXPLICIT inconsistency
 
                    drop((predicate, branch));
 
                }
 
                B::SyncBlockEnd => {
 
                    // make concrete all variables
 
                    for port in ports.iter() {
 
                        let var = cu.port_info.spec_var_for(*port);
 
                        let should_have_fired = branch.did_put_or_get.contains(port);
 
                        let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT);
 
                        let did_fire = val == SpecVal::FIRING;
 
                        if did_fire != should_have_fired {
 
                            log!(cu.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired);
 
                            // IMPLICIT inconsistency
 
                            drop((predicate, branch));
 
                            return Ok(());
 
                        }
 
                    }
 
                    // submit solution for this component
 
                    let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id));
 
                    rctx.solution_storage.submit_and_digest_subtree_solution(
 
                        &mut *cu.logger,
 
                        subtree_id,
 
                        predicate.clone(),
 
                    );
 
                    branch.ended = true;
 
                    // move to "blocked"
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntReadMsg(port) => {
 
                    // move to "blocked"
 
                    assert!(!branch.inbox.contains_key(&port));
 
                    drainer.add_output(predicate, branch);
 
                }
 
                B::CouldntCheckFiring(port) => {
 
                    // sanity check
 
                    let var = cu.port_info.spec_var_for(port);
 
                    assert!(predicate.query(var).is_none());
 
                    // keep forks in "unblocked"
 
                    drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone());
 
                    drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch);
 
                }
 
                B::PutMsg(putter, payload) => {
 
                    // sanity check
 
                    assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter));
 
                    // overwrite assignment
 
                    let var = cu.port_info.spec_var_for(putter);
 
                    let was = predicate.assigned.insert(var, SpecVal::FIRING);
 
                    if was == Some(SpecVal::SILENT) {
 
                        log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var);
 
                        // discard forever
 
                        drop((predicate, branch));
 
                    } else {
 
                        // keep in "unblocked"
 
                        branch.did_put_or_get.insert(putter);
 
                        log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var);
 
                        let msg = SendPayloadMsg { predicate: predicate.clone(), payload };
 
                        rctx.getter_buffer.putter_add(cu, putter, msg);
 
                        drainer.add_input(predicate, branch);
 
                    }
 
                }
 
            }
 
            Ok(())
 
        })
 
    }
 
    fn branch_merge_func(
 
        mut a: ProtoComponentBranch,
 
        b: &mut ProtoComponentBranch,
 
    ) -> ProtoComponentBranch {
 
        if b.ended && !a.ended {
 
            a.ended = true;
 
            std::mem::swap(&mut a, b);
 
        }
 
        a
 
    }
 
    // fn branch_merge_func(
 
    //     mut a: ProtoComponentBranch,
 
    //     b: &mut ProtoComponentBranch,
 
    // ) -> ProtoComponentBranch {
 
    //     if b.ended && !a.ended {
 
    //         a.ended = true;
 
    //         std::mem::swap(&mut a, b);
 
    //     }
 
    //     a
 
    // }
 
    fn feed_msg(
 
        &mut self,
 
        cu: &mut ConnectorUnphased,
 
        rctx: &mut RoundCtx,
 
        proto_component_id: ProtoComponentId,
 
        getter: PortId,
 
        send_payload_msg: &SendPayloadMsg,
 
        mut pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>,
 
    ) -> Result<(), UnrecoverableSyncError> {
 
        let logger = &mut *cu.logger;
 
        log!(
 
            logger,
 
            "feeding proto component {:?} getter {:?} {:?}",
 
            proto_component_id,
 
            getter,
 
            &send_payload_msg
 
        );
 
        let BranchingProtoComponent { branches, ports } = self;
 
        let mut unblocked = HashMap::default();
 
        let mut blocked = HashMap::default();
 
        let (mut unblocked, mut pcb_temps) = pcb_temps.split_first_mut();
 
        let (mut blocked, mut pcb_temps) = pcb_temps.split_first_mut();
 
        // partition drain from branches -> {unblocked, blocked}
 
        log!(logger, "visiting {} blocked branches...", branches.len());
 
        for (predicate, mut branch) in branches.drain() {
 
            if branch.ended {
 
                log!(logger, "Skipping ended branch with {:?}", &predicate);
 
                blocked.insert(predicate, branch);
 
                continue;
 
            }
 
            use AssignmentUnionResult as Aur;
 
            log!(logger, "visiting branch with pred {:?}", &predicate);
 
            match predicate.assignment_union(&send_payload_msg.predicate) {
 
                Aur::Nonexistant => {
 
                    // this branch does not receive the message
 
                    log!(logger, "skipping branch");
 
                    blocked.insert(predicate, branch);
 
                }
 
                Aur::Equivalent | Aur::FormerNotLatter => {
 
                    // retain the existing predicate, but add this payload
 
                    log!(logger, "feeding this branch without altering its predicate");
 
                    branch.feed_msg(getter, send_payload_msg.payload.clone());
 
                    unblocked.insert(predicate, branch);
 
                }
 
                Aur::LatterNotFormer => {
 
                    // fork branch, give fork the message and payload predicate. original branch untouched
 
                    log!(logger, "Forking this branch, giving it the predicate of the msg");
 
                    let mut branch2 = branch.clone();
 
                    let predicate2 = send_payload_msg.predicate.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
                Aur::New(predicate2) => {
 
                    // fork branch, give fork the message and the new predicate. original branch untouched
 
                    log!(logger, "Forking this branch with new predicate {:?}", &predicate2);
 
                    let mut branch2 = branch.clone();
 
                    branch2.feed_msg(getter, send_payload_msg.payload.clone());
 
                    blocked.insert(predicate, branch);
 
                    unblocked.insert(predicate2, branch2);
 
                }
 
            }
 
        }
 
        log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len());
 
        // drain from unblocked --> blocked
 
        let mut swap = HashMap::default();
 
        let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked);
 
        let (swap, _pcb_temps) = pcb_temps.split_first_mut();
 
        let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0);
 
        BranchingProtoComponent::drain_branches_to_blocked(
 
            cd,
 
            cu,
 
            rctx,
 
            proto_component_id,
 
            ports,
 
        )?;
 
        // swap the blocked branches back
 
        std::mem::swap(&mut blocked, branches);
 
        std::mem::swap(blocked.0, branches);
 
        log!(cu.logger, "component settles down with branches: {:?}", branches.keys());
 
        Ok(())
 
    }
 
    fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent {
 
        let BranchingProtoComponent { ports, branches } = self;
 
        for (branch_predicate, branch) in branches {
 
            if branch.ended && branch_predicate.assigns_subset(solution_predicate) {
 
                let ProtoComponentBranch { state, .. } = branch;
 
                return ProtoComponent { state, ports };
 
            }
 
        }
 
        panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate);
 
    }
 
    fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self {
 
        let branch = ProtoComponentBranch {
 
            inbox: Default::default(),
 
            did_put_or_get: Default::default(),
 
            state,
 
            ended: false,
 
            untaken_choice: None,
 
        };
 
        Self { ports, branches: hashmap! { Predicate::default() => branch  } }
 
        Self { ports, branches: hashmap! { Predicate::default() => branch } }
 
    }
 
}
 
impl SolutionStorage {
 
    fn new(subtree_ids: impl Iterator<Item = SubtreeId>) -> Self {
 
        let mut subtree_id_to_index: HashMap<SubtreeId, usize> = Default::default();
 
        let mut subtree_solutions = vec![];
 
        for id in subtree_ids {
 
            subtree_id_to_index.insert(id, subtree_solutions.len());
 
            subtree_solutions.push(Default::default())
 
        }
 
        Self {
 
            subtree_solutions,
 
            subtree_id_to_index,
 
            old_local: Default::default(),
 
            new_local: Default::default(),
 
        }
 
    }
 
    fn is_clear(&self) -> bool {
 
        self.subtree_id_to_index.is_empty()
 
            && self.subtree_solutions.is_empty()
 
            && self.old_local.is_empty()
 
            && self.new_local.is_empty()
 
    }
 
    fn clear(&mut self) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
    }
 
    fn reset(&mut self, subtree_ids: impl Iterator<Item = SubtreeId>) {
 
        self.subtree_id_to_index.clear();
 
        self.subtree_solutions.clear();
 
        self.old_local.clear();
 
        self.new_local.clear();
 
        for key in subtree_ids {
 
            self.subtree_id_to_index.insert(key, self.subtree_solutions.len());
 
            self.subtree_solutions.push(Default::default())
 
        }
 
    }
 
    pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator<Item = Predicate> + '_ {
 
        let Self { old_local, new_local, .. } = self;
 
        new_local.drain().map(move |local| {
 
            old_local.insert(local.clone());
 
            local
 
        })
 
    }
 
    pub(crate) fn submit_and_digest_subtree_solution(
 
        &mut self,
 
        logger: &mut dyn Logger,
 
        subtree_id: SubtreeId,
 
        predicate: Predicate,
 
    ) {
 
        log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate);
 
        let index = self.subtree_id_to_index[&subtree_id];
 
        let left = 0..index;
 
        let right = (index + 1)..self.subtree_solutions.len();
 

	
 
        let Self { subtree_solutions, new_local, old_local, .. } = self;
 
        let was_new = subtree_solutions[index].insert(predicate.clone());
 
        if was_new {
 
            let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]);
 
            Self::elaborate_into_new_local_rec(
 
                logger,
 
                predicate,
 
                set_visitor,
 
                old_local,
 
                new_local,
 
            );
 
        }
 
    }
 
    fn elaborate_into_new_local_rec<'a, 'b>(
 
        logger: &mut dyn Logger,
 
        partial: Predicate,
 
        mut set_visitor: impl Iterator<Item = &'b HashSet<Predicate>> + Clone,
 
        old_local: &'b HashSet<Predicate>,
 
        new_local: &'a mut HashSet<Predicate>,
 
    ) {
 
        if let Some(set) = set_visitor.next() {
 
            // incomplete solution. keep traversing
 
            for pred in set.iter() {
 
                if let Some(elaborated) = pred.union_with(&partial) {
 
                    Self::elaborate_into_new_local_rec(
 
                        logger,
 
                        elaborated,
 
                        set_visitor.clone(),
 
                        old_local,
 
                        new_local,
 
                    )
 
                }
 
            }
 
        } else {
 
            // recursive stop condition. `partial` is a local subtree solution
 
            if !old_local.contains(&partial) {
 
                // ... and it hasn't been found before
 
                log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial);
 
                new_local.insert(partial);
0 comments (0 inline, 0 general)