use super::*; use crate::common::*; use core::ops::{Deref, DerefMut}; //////////////// // Guard protecting an incrementally unfoldable slice of MapTempGuard elements struct MapTempsGuard<'a, K, V>(&'a mut [HashMap]); // Type protecting a temporary map; At the start and end of the Guard's lifetime, self.0.is_empty() must be true struct MapTempGuard<'a, K, V>(&'a mut HashMap); #[derive(Default)] struct GetterBuffer { getters_and_sends: Vec<(PortId, SendPayloadMsg)>, } struct RoundCtx { solution_storage: SolutionStorage, spec_var_stream: SpecVarStream, getter_buffer: GetterBuffer, deadline: Option, } struct BranchingNative { branches: HashMap, } #[derive(Clone, Debug)] struct NativeBranch { index: usize, gotten: HashMap, to_get: HashSet, } #[derive(Debug)] struct SolutionStorage { old_local: HashSet, new_local: HashSet, // this pair acts as SubtreeId -> HashSet which is friendlier to iteration subtree_solutions: Vec>, subtree_id_to_index: HashMap, } #[derive(Debug)] struct BranchingProtoComponent { ports: HashSet, branches: HashMap, } #[derive(Debug, Clone)] struct ProtoComponentBranch { state: ComponentState, inner: ProtoComponentBranchInner, ended: bool, } struct CyclicDrainer<'a, K: Eq + Hash, V> { input: &'a mut HashMap, inner: CyclicDrainInner<'a, K, V>, } struct CyclicDrainInner<'a, K: Eq + Hash, V> { swap: &'a mut HashMap, output: &'a mut HashMap, } trait ReplaceBoolTrue { fn replace_with_true(&mut self) -> bool; } impl ReplaceBoolTrue for bool { fn replace_with_true(&mut self) -> bool { let was = *self; *self = true; !was } } //////////////// impl<'a, K, V> MapTempsGuard<'a, K, V> { fn reborrow(&mut self) -> MapTempsGuard<'_, K, V> { MapTempsGuard(self.0) } fn split_first_mut(self) -> (MapTempGuard<'a, K, V>, MapTempsGuard<'a, K, V>) { let (head, tail) = self.0.split_first_mut().expect("Cache exhausted"); (MapTempGuard::new(head), MapTempsGuard(tail)) } } impl<'a, K, V> MapTempGuard<'a, K, V> { fn new(map: &'a mut HashMap) -> Self { assert!(map.is_empty()); // sanity check Self(map) } } impl<'a, K, V> Drop for MapTempGuard<'a, K, V> { fn drop(&mut self) { assert!(self.0.is_empty()); // sanity check } } impl<'a, K, V> Deref for MapTempGuard<'a, K, V> { type Target = HashMap; fn deref(&self) -> &::Target { self.0 } } impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> { fn deref_mut(&mut self) -> &mut ::Target { self.0 } } impl RoundCtxTrait for RoundCtx { fn get_deadline(&self) -> &Option { &self.deadline } fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { self.getter_buffer.getter_add(getter, msg) } } impl Connector { fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> { if let ConnectorPhased::Communication(comm) = &mut self.phased { Some(comm) } else { None } } pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError as Ge; let comm = self.get_comm_mut().ok_or(Ge::NoPreviousRound)?; match &comm.round_result { Err(_) => Err(Ge::PreviousSyncFailed), Ok(None) => Err(Ge::NoPreviousRound), Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet), } } pub fn next_batch(&mut self) -> Result { // returns index of new batch let comm = self.get_comm_mut().ok_or(WrongStateError)?; comm.native_batches.push(Default::default()); Ok(comm.native_batches.len() - 1) } fn port_op_access( &mut self, port: PortId, expect_polarity: Polarity, ) -> Result<&mut NativeBatch, PortOpError> { use PortOpError as Poe; let Self { unphased: cu, phased } = self; if !cu.inner.native_ports.contains(&port) { return Err(Poe::PortUnavailable); } match cu.inner.port_info.polarities.get(&port) { Some(p) if *p == expect_polarity => {} Some(_) => return Err(Poe::WrongPolarity), None => return Err(Poe::UnknownPolarity), } match phased { ConnectorPhased::Setup { .. } => Err(Poe::NotConnected), ConnectorPhased::Communication(comm) => { let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant Ok(batch) } } } pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { use PortOpError as Poe; let batch = self.port_op_access(port, Putter)?; if batch.to_put.contains_key(&port) { Err(Poe::MultipleOpsOnPort) } else { batch.to_put.insert(port, payload); Ok(()) } } pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { use PortOpError as Poe; let batch = self.port_op_access(port, Getter)?; if batch.to_get.insert(port) { Ok(()) } else { Err(Poe::MultipleOpsOnPort) } } // entrypoint for caller. overwrites round result enum, and returns what happened pub fn sync(&mut self, timeout: Option) -> Result { let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected), ConnectorPhased::Communication(comm) => { match &comm.round_result { Err(SyncError::Unrecoverable(e)) => { log!(cu.inner.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); return Err(SyncError::Unrecoverable(e.clone())); } _ => {} } comm.round_result = Self::connected_sync(cu, comm, timeout); comm.round_index += 1; match &comm.round_result { Ok(None) => unreachable!(), Ok(Some(ok_result)) => Ok(ok_result.batch_index), Err(sync_error) => Err(sync_error.clone()), } } } } // private function. mutates state but returns with round // result ASAP (allows for convenient error return with ?) fn connected_sync( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, timeout: Option, ) -> Result, SyncError> { ////////////////////////////////// use SyncError as Se; ////////////////////////////////// log!( cu.inner.logger, "~~~ SYNC called with timeout {:?}; starting round {}", &timeout, comm.round_index ); // 1. run all proto components to Nonsync blockers // NOTE: original components are immutable until Decision::Success let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len()); // drains unrun_components, and populates branching_proto_components. while let Some((proto_component_id, mut component)) = unrun_components.pop() { // TODO coalesce fields log!( cu.inner.logger, "Nonsync running proto component with ID {:?}. {} to go after this", proto_component_id, unrun_components.len() ); let mut ctx = NonsyncProtoContext { cu_inner: &mut cu.inner, proto_component_id, unrun_components: &mut unrun_components, proto_component_ports: &mut cu .proto_components .get_mut(&proto_component_id) .unwrap() // unrun_components' keys originate from proto_components .ports, }; let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description); log!( cu.inner.logger, "proto component {:?} ran to nonsync blocker {:?}", proto_component_id, &blocker ); use NonsyncBlocker as B; match blocker { B::ComponentExit => drop(component), B::Inconsistent => return Err(Se::InconsistentProtoComponent(proto_component_id)), B::SyncBlockStart => { branching_proto_components .insert(proto_component_id, BranchingProtoComponent::initial(component)); } } } log!( cu.inner.logger, "All {} proto components are now done with Nonsync phase", branching_proto_components.len(), ); // Create temp structures needed for the synchronous phase of the round let mut rctx = RoundCtx { solution_storage: { let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native)); let c = cu .proto_components .keys() .map(|&id| SubtreeId::LocalComponent(ComponentId::Proto(id))); let e = comm .neighborhood .children .iter() .map(|&index| SubtreeId::NetEndpoint { index }); let subtree_id_iter = n.chain(c).chain(e); log!( cu.inner.logger, "Children in subtree are: {:?}", subtree_id_iter.clone().collect::>() ); SolutionStorage::new(subtree_id_iter) }, spec_var_stream: cu.inner.id_manager.new_spec_var_stream(), getter_buffer: Default::default(), deadline: timeout.map(|to| Instant::now() + to), }; log!(cu.inner.logger, "Round context structure initialized"); // Explore all native branches eagerly. Find solutions, buffer messages, etc. log!( cu.inner.logger, "Translating {} native batches into branches...", comm.native_batches.len() ); let native_spec_var = rctx.spec_var_stream.next(); log!(cu.inner.logger, "Native branch spec var is {:?}", native_spec_var); let mut branching_native = BranchingNative { branches: Default::default() }; 'native_branches: for ((native_branch, index), branch_spec_val) in comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain()) { let NativeBatch { to_get, to_put } = native_branch; let predicate = { let mut predicate = Predicate::default(); // assign trues for ports that fire let firing_ports: HashSet = to_get.iter().chain(to_put.keys()).copied().collect(); for &port in to_get.iter().chain(to_put.keys()) { let var = cu.inner.port_info.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } // assign falses for all silent (not firing) ports for &port in cu.inner.native_ports.difference(&firing_ports) { let var = cu.inner.port_info.spec_var_for(port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; } } // this branch is consistent. distinguish it with a unique var:val mapping and proceed predicate.inserted(native_spec_var, branch_spec_val) }; log!( cu.inner.logger, "Native branch index={:?} has consistent {:?}", index, &predicate ); // send all outgoing messages (by buffering them) for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg); rctx.getter_buffer.putter_add(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; if branch.is_ended() { log!( cu.inner.logger, "Native submitting solution for batch {} with {:?}", index, &predicate ); rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, SubtreeId::LocalComponent(ComponentId::Native), predicate.clone(), ); } if let Some(_) = branching_native.branches.insert(predicate, branch) { // thanks to the native_spec_var, each batch has a distinct predicate unreachable!() } } // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.inner.logger); // Call to another big method; keep running this round until a distributed decision is reached let decision = Self::sync_reach_decision( cu, comm, &mut branching_native, &mut branching_proto_components, &mut rctx, )?; log!(cu.inner.logger, "Committing to decision {:?}!", &decision); comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?; // propagate the decision to children let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce { decision: decision.clone(), }), }); log!( cu.inner.logger, "Announcing decision {:?} through child endpoints {:?}", &msg, &comm.neighborhood.children ); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_comms(child, &msg)?; } let ret = match decision { Decision::Failure => { // dropping {branching_proto_components, branching_native} Err(Se::RoundFailure) } Decision::Success(predicate) => { // commit changes to component states cu.proto_components.clear(); cu.proto_components.extend( // consume branching proto components branching_proto_components .into_iter() .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), ); log!( cu.inner.logger, "End round with (updated) component states {:?}", cu.proto_components.keys() ); // consume native Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate))) } }; log!(cu.inner.logger, "Sync round ending! Cleaning up"); ret } fn sync_reach_decision( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, branching_native: &mut BranchingNative, branching_proto_components: &mut HashMap, rctx: &mut RoundCtx, ) -> Result { let mut already_requested_failure = false; if branching_native.branches.is_empty() { log!(cu.inner.logger, "Native starts with no branches! Failure!"); match comm.neighborhood.parent { Some(parent) => { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { log!(cu.inner.logger, "Already requested failure"); } } None => { log!(cu.inner.logger, "No parent. Deciding on failure"); return Ok(Decision::Failure); } } } log!(cu.inner.logger, "Done translating native batches into branches"); let mut pcb_temps_owner = <[HashMap; 3]>::default(); let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner); let mut bn_temp_owner = >::default(); // run all proto components to their sync blocker log!( cu.inner.logger, "Running all {} proto components to their sync blocker...", branching_proto_components.len() ); for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() { let BranchingProtoComponent { ports, branches } = proto_component; // must reborrow to constrain the lifetime of pcb_temps to inside the loop let (swap, pcb_temps) = pcb_temps.reborrow().split_first_mut(); let (blocked, _pcb_temps) = pcb_temps.split_first_mut(); // initially, no components have .ended==true // drain from branches --> blocked let cd = CyclicDrainer::new(branches, swap.0, blocked.0); BranchingProtoComponent::drain_branches_to_blocked( cd, cu, rctx, proto_component_id, ports, )?; // swap the blocked branches back std::mem::swap(blocked.0, branches); if branches.is_empty() { log!(cu.inner.logger, "{:?} has become inconsistent!", proto_component_id); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { log!(cu.inner.logger, "Already requested failure"); } } else { log!(cu.inner.logger, "As the leader, deciding on timeout"); return Ok(Decision::Failure); } } } log!(cu.inner.logger, "All proto components are blocked"); log!(cu.inner.logger, "Entering decision loop..."); comm.endpoint_manager.undelay_all(); 'undecided: loop { // drain payloads_to_get, sending them through endpoints / feeding them to components log!( cu.inner.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len() ); while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); let route = cu.inner.port_info.routes.get(&getter); log!( cu.inner.logger, "Routing msg {:?} to {:?} via {:?}", &send_payload_msg, getter, &route ); match route { None => log!(cu.inner.logger, "Delivery failed. Physical route unmapped!"), Some(Route::UdpEndpoint { index }) => { let udp_endpoint_ext = &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index]; let SendPayloadMsg { predicate, payload } = send_payload_msg; log!(cu.inner.logger, "Delivering to udp endpoint index={}", index); udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } Some(Route::NetEndpoint { index }) => { let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::SendPayload(send_payload_msg), }); comm.endpoint_manager.send_to_comms(*index, &msg)?; } Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg( cu, rctx, getter, &send_payload_msg, MapTempGuard::new(&mut bn_temp_owner), ), Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => { if let Some(branching_component) = branching_proto_components.get_mut(proto_component_id) { let proto_component_id = *proto_component_id; branching_component.feed_msg( cu, rctx, proto_component_id, getter, &send_payload_msg, pcb_temps.reborrow(), )?; if branching_component.branches.is_empty() { log!( cu.inner.logger, "{:?} has become inconsistent!", proto_component_id ); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { log!(cu.inner.logger, "Already requested failure"); } } else { log!(cu.inner.logger, "As the leader, deciding on timeout"); return Ok(Decision::Failure); } } } else { log!( cu.inner.logger, "Delivery to getter {:?} msg {:?} failed because {:?} isn't here", getter, &send_payload_msg, proto_component_id ); } } } } // check if we have a solution yet log!(cu.inner.logger, "Check if we have any local decisions..."); for solution in rctx.solution_storage.iter_new_local_make_old() { log!(cu.inner.logger, "New local decision with solution {:?}...", &solution); match comm.neighborhood.parent { Some(parent) => { log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); let suggestion = Decision::Success(solution); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion, }), }); comm.endpoint_manager.send_to_comms(parent, &msg)?; } None => { log!(cu.inner.logger, "No parent. Deciding on solution {:?}", &solution); return Ok(Decision::Success(solution)); } } } // stuck! make progress by receiving a msg // try recv messages arriving through endpoints log!(cu.inner.logger, "No decision yet. Let's recv an endpoint msg..."); { let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm .endpoint_manager .try_recv_any_comms( &mut *cu.inner.logger, &cu.inner.port_info, rctx, comm.round_index, )? { CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), CommRecvOk::NewPayloadMsgs => continue 'undecided, CommRecvOk::TimeoutWithoutNew => { log!(cu.inner.logger, "Reached user-defined deadling without decision..."); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { log!(cu.inner.logger, "Already requested failure"); } } else { log!(cu.inner.logger, "As the leader, deciding on timeout"); return Ok(Decision::Failure); } rctx.deadline = None; continue 'undecided; } }; log!( cu.inner.logger, "Received from endpoint {} ctrl msg {:?}", net_index, &comm_ctrl_msg ); match comm_ctrl_msg { CommCtrlMsg::Suggest { suggestion } => { // only accept this control msg through a child endpoint if comm.neighborhood.children.contains(&net_index) { match suggestion { Decision::Success(predicate) => { // child solution contributes to local solution log!( cu.inner.logger, "Child provided solution {:?}", &predicate ); let subtree_id = SubtreeId::NetEndpoint { index: net_index }; rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, subtree_id, predicate, ); } Decision::Failure => { match comm.neighborhood.parent { None => { log!(cu.inner.logger, "I decide on my child's failure"); break 'undecided Ok(Decision::Failure); } Some(parent) => { log!(cu.inner.logger, "Forwarding failure through my parent endpoint {:?}", parent); if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { log!(cu.inner.logger, "Already requested failure"); } } } } } } else { log!( cu.inner.logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, net_index ); } } CommCtrlMsg::Announce { decision } => { if Some(net_index) == comm.neighborhood.parent { // adopt this decision return Ok(decision); } else { log!( cu.inner.logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, net_index ); } } } } log!(cu.inner.logger, "Endpoint msg recv done"); } } fn request_failure( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, parent: usize, ) -> Result<(), UnrecoverableSyncError> { log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); let suggestion = Decision::Failure; let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion }), }); comm.endpoint_manager.send_to_comms(parent, &msg) } } impl NativeBranch { fn is_ended(&self) -> bool { self.to_get.is_empty() } } impl BranchingNative { fn feed_msg( &mut self, cu: &mut ConnectorUnphased, round_ctx: &mut RoundCtx, getter: PortId, send_payload_msg: &SendPayloadMsg, bn_temp: MapTempGuard<'_, Predicate, NativeBranch>, ) { log!(cu.inner.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); let mut draining = bn_temp; let finished = &mut self.branches; std::mem::swap(draining.0, finished); for (predicate, mut branch) in draining.drain() { log!(cu.inner.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it let var = cu.inner.port_info.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { branch.to_get.remove(&getter); let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); if branch.is_ended() { log!( cu.inner.logger, "new native solution with {:?} is_ended() with gotten {:?}", &predicate, &branch.gotten ); let subtree_id = SubtreeId::LocalComponent(ComponentId::Native); round_ctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, subtree_id, predicate.clone(), ); } else { log!( cu.inner.logger, "Fed native {:?} still has to_get {:?}", &predicate, &branch.to_get ); } }; if predicate.query(var) != Some(SpecVal::FIRING) { // optimization. Don't bother trying this branch log!( cu.inner.logger, "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); Self::insert_branch_merging(finished, predicate, branch); continue; } use AssignmentUnionResult as Aur; match predicate.assignment_union(&send_payload_msg.predicate) { Aur::Nonexistant => { // this branch does not receive the message log!( cu.inner.logger, "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); Self::insert_branch_merging(finished, predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); log!(cu.inner.logger, "branch pred covers it! Accept the msg"); Self::insert_branch_merging(finished, predicate, branch); } Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); feed_branch(&mut branch2, &predicate2); log!( cu.inner.logger, "payload pred {:?} covers branch pred {:?}", &predicate2, &predicate ); Self::insert_branch_merging(finished, predicate, branch); Self::insert_branch_merging(finished, predicate2, branch2); } Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); log!( cu.inner.logger, "new subsuming pred created {:?}. forking and feeding", &predicate2 ); Self::insert_branch_merging(finished, predicate, branch); Self::insert_branch_merging(finished, predicate2, branch2); } } } } fn insert_branch_merging( branches: &mut HashMap, predicate: Predicate, mut branch: NativeBranch, ) { let e = branches.entry(predicate); use std::collections::hash_map::Entry; match e { Entry::Vacant(ev) => { // no existing branch present. We insert it no problem. (The most common case) ev.insert(branch); } Entry::Occupied(mut eo) => { // Oh dear, there is already a branch with this predicate. // Rather than choosing either branch, we MERGE them. // This means taking the UNION of their .gotten and the INTERSECTION of their .to_get let old = eo.get_mut(); for (k, v) in branch.gotten.drain() { if old.gotten.insert(k, v).is_none() { // added a gotten element in `branch` not already in `old` old.to_get.remove(&k); } } } } } fn collapse_with(self, logger: &mut dyn Logger, solution_predicate: &Predicate) -> RoundOk { log!( logger, "Collapsing native with {} branch preds {:?}", self.branches.len(), self.branches.keys() ); for (branch_predicate, branch) in self.branches { log!( logger, "Considering native branch {:?} with to_get {:?} gotten {:?}", &branch_predicate, &branch.to_get, &branch.gotten ); if branch.is_ended() && branch_predicate.assigns_subset(solution_predicate) { let NativeBranch { index, gotten, .. } = branch; log!(logger, "Collapsed native has gotten {:?}", &gotten); return RoundOk { batch_index: index, gotten }; } } panic!("Native had no branches matching pred {:?}", solution_predicate); } } impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, cu: &mut ConnectorUnphased, rctx: &mut RoundCtx, proto_component_id: ProtoComponentId, ports: &HashSet, ) -> Result<(), UnrecoverableSyncError> { cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { cu_inner: &mut cu.inner, predicate: &predicate, branch_inner: &mut branch.inner, }; let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description); log!( cu.inner.logger, "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", proto_component_id, &predicate, &blocker, ); use SyncBlocker as B; match blocker { B::Inconsistent => drop((predicate, branch)), // EXPLICIT inconsistency B::NondetChoice { n } => { let var = rctx.spec_var_stream.next(); for val in SpecVal::iter_domain().take(n as usize) { let pred = predicate.clone().inserted(var, val); let mut branch_n = branch.clone(); branch_n.inner.untaken_choice = Some(val.0); drainer.add_input(pred, branch_n); } } B::CouldntReadMsg(port) => { // move to "blocked" assert!(!branch.inner.inbox.contains_key(&port)); drainer.add_output(predicate, branch); } B::CouldntCheckFiring(port) => { // sanity check let var = cu.inner.port_info.spec_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch); } B::PutMsg(putter, payload) => { // sanity check assert_eq!(Some(&Putter), cu.inner.port_info.polarities.get(&putter)); // overwrite assignment let var = cu.inner.port_info.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" branch.inner.did_put_or_get.insert(putter); log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.getter_buffer.putter_add(cu, putter, msg); drainer.add_input(predicate, branch); } } B::SyncBlockEnd => { // make concrete all variables for port in ports.iter() { let var = cu.inner.port_info.spec_var_for(*port); let should_have_fired = branch.inner.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let did_fire = val == SpecVal::FIRING; if did_fire != should_have_fired { log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); // IMPLICIT inconsistency drop((predicate, branch)); return Ok(()); } } // submit solution for this component let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id)); rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, subtree_id, predicate.clone(), ); branch.ended = true; // move to "blocked" drainer.add_output(predicate, branch); } } Ok(()) }) } // fn branch_merge_func( // mut a: ProtoComponentBranch, // b: &mut ProtoComponentBranch, // ) -> ProtoComponentBranch { // if b.ended && !a.ended { // a.ended = true; // std::mem::swap(&mut a, b); // } // a // } fn feed_msg( &mut self, cu: &mut ConnectorUnphased, rctx: &mut RoundCtx, proto_component_id: ProtoComponentId, getter: PortId, send_payload_msg: &SendPayloadMsg, pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>, ) -> Result<(), UnrecoverableSyncError> { let logger = &mut *cu.inner.logger; log!( logger, "feeding proto component {:?} getter {:?} {:?}", proto_component_id, getter, &send_payload_msg ); let BranchingProtoComponent { branches, ports } = self; let (mut unblocked, pcb_temps) = pcb_temps.split_first_mut(); let (mut blocked, pcb_temps) = pcb_temps.split_first_mut(); // partition drain from branches -> {unblocked, blocked} log!(logger, "visiting {} blocked branches...", branches.len()); for (predicate, mut branch) in branches.drain() { if branch.ended { log!(logger, "Skipping ended branch with {:?}", &predicate); blocked.insert(predicate, branch); continue; } use AssignmentUnionResult as Aur; log!(logger, "visiting branch with pred {:?}", &predicate); match predicate.assignment_union(&send_payload_msg.predicate) { Aur::Nonexistant => { // this branch does not receive the message log!(logger, "skipping branch"); blocked.insert(predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload log!(logger, "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); unblocked.insert(predicate, branch); } Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched log!(logger, "Forking this branch, giving it the predicate of the msg"); let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); blocked.insert(predicate, branch); unblocked.insert(predicate2, branch2); } Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched log!(logger, "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); blocked.insert(predicate, branch); unblocked.insert(predicate2, branch2); } } } log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len()); // drain from unblocked --> blocked let (swap, _pcb_temps) = pcb_temps.split_first_mut(); let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0); BranchingProtoComponent::drain_branches_to_blocked( cd, cu, rctx, proto_component_id, ports, )?; // swap the blocked branches back std::mem::swap(blocked.0, branches); log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys()); Ok(()) } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches { if branch.ended && branch_predicate.assigns_subset(solution_predicate) { let ProtoComponentBranch { state, .. } = branch; return ProtoComponent { state, ports }; } } panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false }; Self { ports, branches: hashmap! { Predicate::default() => branch } } } } impl SolutionStorage { fn new(subtree_ids: impl Iterator) -> Self { let mut subtree_id_to_index: HashMap = Default::default(); let mut subtree_solutions = vec![]; for id in subtree_ids { subtree_id_to_index.insert(id, subtree_solutions.len()); subtree_solutions.push(Default::default()) } Self { subtree_solutions, subtree_id_to_index, old_local: Default::default(), new_local: Default::default(), } } // fn is_clear(&self) -> bool { // self.subtree_id_to_index.is_empty() // && self.subtree_solutions.is_empty() // && self.old_local.is_empty() // && self.new_local.is_empty() // } // fn clear(&mut self) { // self.subtree_id_to_index.clear(); // self.subtree_solutions.clear(); // self.old_local.clear(); // self.new_local.clear(); // } // fn reset(&mut self, subtree_ids: impl Iterator) { // self.subtree_id_to_index.clear(); // self.subtree_solutions.clear(); // self.old_local.clear(); // self.new_local.clear(); // for key in subtree_ids { // self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); // self.subtree_solutions.push(Default::default()) // } // } pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { let Self { old_local, new_local, .. } = self; new_local.drain().map(move |local| { old_local.insert(local.clone()); local }) } pub(crate) fn submit_and_digest_subtree_solution( &mut self, logger: &mut dyn Logger, subtree_id: SubtreeId, predicate: Predicate, ) { log!(logger, "++ new component solution {:?} {:?}", subtree_id, &predicate); let index = self.subtree_id_to_index[&subtree_id]; let left = 0..index; let right = (index + 1)..self.subtree_solutions.len(); let Self { subtree_solutions, new_local, old_local, .. } = self; let was_new = subtree_solutions[index].insert(predicate.clone()); if was_new { let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); Self::elaborate_into_new_local_rec( logger, predicate, set_visitor, old_local, new_local, ); } } fn elaborate_into_new_local_rec<'a, 'b>( logger: &mut dyn Logger, partial: Predicate, mut set_visitor: impl Iterator> + Clone, old_local: &'b HashSet, new_local: &'a mut HashSet, ) { if let Some(set) = set_visitor.next() { // incomplete solution. keep traversing for pred in set.iter() { if let Some(elaborated) = pred.union_with(&partial) { Self::elaborate_into_new_local_rec( logger, elaborated, set_visitor.clone(), old_local, new_local, ) } } } else { // recursive stop condition. `partial` is a local subtree solution if !old_local.contains(&partial) { // ... and it hasn't been found before log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial); new_local.insert(partial); } } } } impl GetterBuffer { fn len(&self) -> usize { self.getters_and_sends.len() } fn pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { self.getters_and_sends.pop() } fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { self.getters_and_sends.push((getter, msg)); } fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) { if let Some(&getter) = cu.inner.port_info.peers.get(&putter) { self.getter_add(getter, msg); } else { log!(cu.inner.logger, "Putter {:?} has no known peer!", putter); panic!("Putter {:?} has no known peer!"); } } } impl SyncProtoContext<'_> { pub(crate) fn is_firing(&mut self, port: PortId) -> Option { let var = self.cu_inner.port_info.spec_var_for(port); self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { self.branch_inner.did_put_or_get.insert(port); self.branch_inner.inbox.get(&port) } pub(crate) fn take_choice(&mut self) -> Option { self.branch_inner.untaken_choice.take() } } impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) { self.swap.insert(k, v); } // fn merge_input_with V>(&mut self, k: K, v: V, mut func: F) { // use std::collections::hash_map::Entry; // let e = self.swap.entry(k); // match e { // Entry::Vacant(ev) => { // ev.insert(v); // } // Entry::Occupied(mut eo) => { // let old = eo.get_mut(); // *old = func(v, old); // } // } // } fn add_output(&mut self, k: K, v: V) { self.output.insert(k, v); } } impl NonsyncProtoContext<'_> { pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { // called by a PROTO COMPONENT. moves its own ports. // 1. sanity check: this component owns these ports log!( self.cu_inner.logger, "Component {:?} added new component with state {:?}, moving ports {:?}", self.proto_component_id, &state, &moved_ports ); assert!(self.proto_component_ports.is_subset(&moved_ports)); // 2. remove ports from old component & update port->route let new_id = self.cu_inner.id_manager.new_proto_component_id(); for port in moved_ports.iter() { self.proto_component_ports.remove(port); self.cu_inner .port_info .routes .insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); } // 3. create a new component self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); } pub fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component let [o, i] = [self.cu_inner.id_manager.new_port_id(), self.cu_inner.id_manager.new_port_id()]; self.proto_component_ports.insert(o); self.proto_component_ports.insert(i); // {polarity, peer, route} known. {} unknown. self.cu_inner.port_info.polarities.insert(o, Putter); self.cu_inner.port_info.polarities.insert(i, Getter); self.cu_inner.port_info.peers.insert(o, i); self.cu_inner.port_info.peers.insert(i, o); let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id)); self.cu_inner.port_info.routes.insert(o, route); self.cu_inner.port_info.routes.insert(i, route); log!( self.cu_inner.logger, "Component {:?} port pair (out->in) {:?} -> {:?}", self.proto_component_id, o, i ); [o, i] } } impl ProtoComponentBranch { fn feed_msg(&mut self, getter: PortId, payload: Payload) { let was = self.inner.inbox.insert(getter, payload); assert!(was.is_none()) } } impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { fn new( input: &'a mut HashMap, swap: &'a mut HashMap, output: &'a mut HashMap, ) -> Self { Self { input, inner: CyclicDrainInner { swap, output } } } fn cyclic_drain( self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>) -> Result<(), E>, ) -> Result<(), E> { let Self { input, inner: CyclicDrainInner { swap, output } } = self; // assert!(swap.is_empty()); while !input.is_empty() { for (k, v) in input.drain() { func(k, v, CyclicDrainInner { swap, output })? } std::mem::swap(input, swap); } Ok(()) } }