diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index c9ea8944f8b1ae6e53be625f437cc000cd5a16b2..bb402ce1bf5db6f999db1e2072a20671ea22097d 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -1,7 +1,7 @@ use crate::common::*; use crate::runtime::{endpoint::*, *}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct MonoN { pub ekeys: HashSet, pub result: Option<(usize, HashMap)>, @@ -18,7 +18,7 @@ pub(crate) struct BranchN { pub sync_batch_index: usize, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MonoP { pub state: ProtocolS, pub ekeys: HashSet, @@ -31,6 +31,7 @@ pub(crate) struct PolyP { } #[derive(Debug, Clone)] pub(crate) struct BranchP { + pub blocking_on: Option, pub outbox: HashMap, pub inbox: HashMap, pub state: ProtocolS, @@ -89,6 +90,7 @@ impl PolyP { // don't rerun now. Rerun at next `sync_run` log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,); + branch.blocking_on = Some(ekey); self.incomplete.insert(predicate, branch); } else { log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,); @@ -214,12 +216,16 @@ impl PolyP { &payload_predicate ); branch.inbox.insert(ekey, payload); - vec![(payload_predicate, branch)] + if branch.blocking_on == Some(ekey) { + branch.blocking_on = None; + vec![(payload_predicate, branch)] + } else { + vec![] + } } else { log!( &mut m_ctx.inner.logger, "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches", - ); let mut incomplete2 = HashMap::<_, _>::default(); let to_run = self @@ -241,7 +247,15 @@ impl PolyP { assert_eq!(prev_payload, &payload); } branch.inbox.insert(ekey, payload.clone()); - Some((old_predicate, branch)) + if branch.blocking_on == Some(ekey) { + // run. + branch.blocking_on = None; + Some((old_predicate, branch)) + } else { + // don't bother running. its awaiting something else + incomplete2.insert(old_predicate, branch); + None + } } Csr::New(new) => { log!( @@ -261,7 +275,15 @@ impl PolyP { // put the original back untouched incomplete2.insert(old_predicate, branch); - Some((new, payload_branch)) + if payload_branch.blocking_on == Some(ekey) { + // run the fork + payload_branch.blocking_on = None; + Some((new, payload_branch)) + } else { + // don't bother running. its awaiting something else + incomplete2.insert(new, payload_branch); + None + } } Csr::LatterNotFormer => { log!( @@ -279,8 +301,16 @@ impl PolyP { payload_branch.inbox.insert(ekey, payload.clone()); // put the original back untouched - incomplete2.insert(old_predicate, branch); - Some((payload_predicate.clone(), payload_branch)) + incomplete2.insert(old_predicate.clone(), branch); + if payload_branch.blocking_on == Some(ekey) { + // run the fork + payload_branch.blocking_on = None; + Some((old_predicate, payload_branch)) + } else { + // don't bother running. its awaiting something else + incomplete2.insert(old_predicate, payload_branch); + None + } } Csr::Nonexistant => { log!( @@ -307,21 +337,11 @@ impl PolyP { self.poly_run_these_branches(m_ctx, protocol_description, to_run) } - pub(crate) fn become_mono( - mut self, - decision: &Predicate, - table_row: &mut HashMap, - ) -> MonoP { - if let Some((_, branch)) = self.complete.drain().find(|(p, _)| decision.satisfies(p)) { - let BranchP { inbox, state, outbox } = branch; - for (key, payload) in inbox.into_iter().chain(outbox.into_iter()) { - table_row.insert(key, payload); - } - self.incomplete.clear(); - MonoP { state, ekeys: self.ekeys } - } else { - panic!("No such solution!") - } + pub(crate) fn choose_mono(&self, decision: &Predicate) -> Option { + self.complete + .iter() + .find(|(p, _)| decision.satisfies(p)) + .map(|(_, branch)| MonoP { state: branch.state.clone(), ekeys: self.ekeys.clone() }) } } @@ -396,32 +416,13 @@ impl PolyN { std::mem::swap(&mut branches2, &mut self.branches); } - pub fn become_mono( - mut self, - logger: &mut String, - decision: &Predicate, - table_row: &mut HashMap, - ) -> MonoN { - log!( - logger, - "decision {:?} with branch preds {:?}", - decision, - self.branches.iter().collect::>() - ); - if let Some((branch_pred, branch)) = self - .branches - .drain() + pub fn choose_mono(&self, decision: &Predicate) -> Option { + self.branches + .iter() .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p)) - { - log!(logger, "decision {:?} mapped to branch {:?}", decision, branch_pred); - let BranchN { gotten, sync_batch_index, .. } = branch; - for (&key, payload) in gotten.iter() { - assert!(table_row.insert(key, payload.clone()).is_none()); - } - MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) } - } else { - log!(logger, "decision {:?} HAD NO SOLUTION!!?", decision); - panic!("No such solution!") - } + .map(|(_, branch)| { + let BranchN { gotten, sync_batch_index, .. } = branch.clone(); + MonoN { ekeys: self.ekeys.clone(), result: Some((sync_batch_index, gotten)) } + }) } }