diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs new file mode 100644 index 0000000000000000000000000000000000000000..ed3c75146dcbd158ade0e1044c0438d372bfab38 --- /dev/null +++ b/src/runtime/actors.rs @@ -0,0 +1,250 @@ +use crate::common::*; +use crate::runtime::{endpoint::*, *}; + +#[derive(Debug)] +pub(crate) struct MonoN { + pub ekeys: HashSet, + pub result: Option<(usize, HashMap)>, +} +#[derive(Debug)] +pub(crate) struct PolyN { + pub ekeys: HashSet, + pub branches: HashMap, +} +#[derive(Debug, Clone)] +pub(crate) struct BranchN { + pub to_get: HashSet, + pub gotten: HashMap, + pub sync_batch_index: usize, +} + +#[derive(Debug)] +pub struct MonoP { + pub state: ProtocolS, + pub ekeys: HashSet, +} +#[derive(Debug)] +pub(crate) struct PolyP { + pub incomplete: HashMap, + pub complete: HashMap, + pub ekeys: HashSet, +} +#[derive(Debug, Clone)] +pub(crate) struct BranchP { + pub inbox: HashMap, + pub state: ProtocolS, +} + +////////////////////////////////////////////////////////////////// + +impl PolyP { + pub(crate) fn poly_run( + &mut self, + m_ctx: PolyPContext, + protocol_description: &ProtocolD, + ) -> Result { + let to_run: Vec<_> = self.incomplete.drain().collect(); + self.poly_run_these_branches(m_ctx, protocol_description, to_run) + } + + pub(crate) fn poly_run_these_branches( + &mut self, + mut m_ctx: PolyPContext, + protocol_description: &ProtocolD, + mut to_run: Vec<(Predicate, BranchP)>, + ) -> Result { + use SyncRunResult as Srr; + while let Some((mut predicate, mut branch)) = to_run.pop() { + let mut r_ctx = BranchPContext { + m_ctx: m_ctx.reborrow(), + ekeys: &self.ekeys, + predicate: &predicate, + inbox: &branch.inbox, + }; + use PolyBlocker as Sb; + let blocker = branch.state.sync_run(&mut r_ctx, protocol_description); + match blocker { + Sb::Inconsistent => {} // DROP + Sb::CouldntReadMsg(ekey) => { + assert!(self.ekeys.contains(&ekey)); + let channel_id = + r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + if predicate.replace_assignment(channel_id, true) != Some(false) { + // don't rerun now. Rerun at next `sync_run` + self.incomplete.insert(predicate, branch); + } + // ELSE DROP + } + Sb::CouldntCheckFiring(ekey) => { + assert!(self.ekeys.contains(&ekey)); + let channel_id = + r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + // split the branch! + let branch_f = branch.clone(); + let mut predicate_f = predicate.clone(); + if predicate_f.replace_assignment(channel_id, false).is_some() { + panic!("OI HANS QUERY FIRST!"); + } + assert!(predicate.replace_assignment(channel_id, true).is_none()); + to_run.push((predicate, branch)); + to_run.push((predicate_f, branch_f)); + } + Sb::SyncBlockEnd => { + // come up with the predicate for this local solution + let ekeys_channel_id_iter = self + .ekeys + .iter() + .map(|&ekey| m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id); + predicate.batch_assign_nones(ekeys_channel_id_iter, false); + // report the local solution + m_ctx + .solution_storage + .submit_and_digest_subtree_solution(m_ctx.my_subtree_id, predicate.clone()); + // store the solution for recovering later + self.complete.insert(predicate, branch); + } + Sb::PutMsg(ekey, payload) => { + assert!(self.ekeys.contains(&ekey)); + let EndpointExt { info, endpoint } = + m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap(); + if predicate.replace_assignment(info.channel_id, true) != Some(false) { + let msg = CommMsgContents::SendPayload { + payload_predicate: predicate.clone(), + payload, + } + .into_msg(m_ctx.inner.round_index); + endpoint.send(msg)?; + to_run.push((predicate, branch)); + } + // ELSE DROP + } + } + } + // all in self.incomplete most recently returned Blocker::CouldntReadMsg + Ok(if self.incomplete.is_empty() { + if self.complete.is_empty() { + Srr::NoBranches + } else { + Srr::AllBranchesComplete + } + } else { + Srr::BlockingForRecv + }) + } + + pub(crate) fn poly_recv_run( + &mut self, + m_ctx: PolyPContext, + protocol_description: &ProtocolD, + ekey: Key, + payload_predicate: Predicate, + payload: Payload, + ) -> Result { + // try exact match + let to_run = if self.complete.contains_key(&payload_predicate) { + // exact match with stopped machine + vec![] + } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) { + // exact match with running machine + branch.inbox.insert(ekey, payload); + vec![(payload_predicate, branch)] + } else { + let mut incomplete2 = HashMap::<_, _>::default(); + let to_run = self + .incomplete + .drain() + .filter_map(|(old_predicate, mut branch)| { + use CommonSatResult as Csr; + match old_predicate.common_satisfier(&payload_predicate) { + Csr::FormerNotLatter | Csr::Equivalent => { + // old_predicate COVERS the assumptions of payload_predicate + let was = branch.inbox.insert(ekey, payload.clone()); + assert!(was.is_none()); // INBOX MUST BE EMPTY! + Some((old_predicate, branch)) + } + Csr::New(unified) => { + // payload_predicate has new assumptions. FORK! + let mut payload_branch = branch.clone(); + let was = payload_branch.inbox.insert(ekey, payload.clone()); + assert!(was.is_none()); // INBOX MUST BE EMPTY! + + // put the original back untouched + incomplete2.insert(old_predicate, branch); + Some((unified, payload_branch)) + } + Csr::LatterNotFormer => { + // payload_predicate has new assumptions. FORK! + let mut payload_branch = branch.clone(); + let was = payload_branch.inbox.insert(ekey, payload.clone()); + assert!(was.is_none()); // INBOX MUST BE EMPTY! + + // put the original back untouched + incomplete2.insert(old_predicate, branch); + Some((payload_predicate.clone(), payload_branch)) + } + Csr::Nonexistant => { + // predicates contradict + incomplete2.insert(old_predicate, branch); + None + } + } + }) + .collect(); + std::mem::swap(&mut self.incomplete, &mut incomplete2); + to_run + }; + self.poly_run_these_branches(m_ctx, protocol_description, to_run) + } + + pub(crate) fn become_mono( + mut self, + decision: &Predicate, + all_inboxes: &mut HashMap, + ) -> MonoP { + if let Some((_, branch)) = self.complete.drain().find(|(p, _)| decision.satisfies(p)) { + let BranchP { inbox, state } = branch; + for (key, payload) in inbox { + assert!(all_inboxes.insert(key, payload).is_none()); + } + self.incomplete.clear(); + MonoP { state, ekeys: self.ekeys } + } else { + panic!("No such solution!") + } + } +} + +impl PolyN { + pub fn sync_recv( + &mut self, + ekey: Key, + payload: Payload, + solution_storage: &mut SolutionStorage, + ) { + for (predicate, branch) in self.branches.iter_mut() { + if branch.to_get.remove(&ekey) { + branch.gotten.insert(ekey, payload.clone()); + if branch.to_get.is_empty() { + solution_storage + .submit_and_digest_subtree_solution(SubtreeId::PolyN, predicate.clone()); + } + } + } + } + + pub fn become_mono( + mut self, + decision: &Predicate, + all_inboxes: &mut HashMap, + ) -> MonoN { + if let Some((_, branch)) = self.branches.drain().find(|(p, _)| decision.satisfies(p)) { + let BranchN { gotten, sync_batch_index, .. } = branch; + for (&key, payload) in gotten.iter() { + assert!(all_inboxes.insert(key, payload.clone()).is_none()); + } + MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) } + } else { + panic!("No such solution!") + } + } +}