diff --git a/src/runtime/retired/actors.rs b/src/runtime/retired/actors.rs new file mode 100644 index 0000000000000000000000000000000000000000..c86044f404990f03c55c5a2ed2b9f47aa472c1ee --- /dev/null +++ b/src/runtime/retired/actors.rs @@ -0,0 +1,442 @@ +use crate::common::*; +use crate::runtime::{endpoint::*, *}; + +#[derive(Debug, Clone)] +pub(crate) struct MonoN { + pub ports: HashSet, + pub result: Option<(usize, HashMap)>, +} +#[derive(Debug)] +pub(crate) struct PolyN { + pub ports: HashSet, + pub branches: HashMap, +} +#[derive(Debug, Clone)] +pub(crate) struct BranchN { + pub to_get: HashSet, + pub gotten: HashMap, + pub sync_batch_index: usize, +} + +#[derive(Debug, Clone)] +pub struct MonoP { + pub state: ProtocolS, + pub ports: HashSet, +} +#[derive(Debug)] +pub(crate) struct PolyP { + pub incomplete: HashMap, + pub complete: HashMap, + pub ports: HashSet, +} +#[derive(Debug, Clone)] +pub(crate) struct BranchP { + pub blocking_on: Option, + pub outbox: HashMap, + pub inbox: HashMap, + pub state: ProtocolS, +} + +////////////////////////////////////////////////////////////////// + +impl PolyP { + pub(crate) fn poly_run( + &mut self, + m_ctx: PolyPContext, + protocol_description: &ProtocolD, + ) -> Result { + let to_run: Vec<_> = self.incomplete.drain().collect(); + self.poly_run_these_branches(m_ctx, protocol_description, to_run) + } + + pub(crate) fn poly_run_these_branches( + &mut self, + mut m_ctx: PolyPContext, + protocol_description: &ProtocolD, + mut to_run: Vec<(Predicate, BranchP)>, + ) -> Result { + use SyncRunResult as Srr; + log!(&mut m_ctx.inner.logger, "~ Running branches for PolyP {:?}!", m_ctx.my_subtree_id,); + 'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() { + let mut r_ctx = BranchPContext { + m_ctx: m_ctx.reborrow(), + ports: &self.ports, + predicate: &predicate, + inbox: &branch.inbox, + }; + use PolyBlocker as Sb; + let blocker = branch.state.sync_run(&mut r_ctx, protocol_description); + log!( + &mut r_ctx.m_ctx.inner.logger, + "~ ... ran PolyP {:?} with branch pred {:?} to blocker {:?}", + r_ctx.m_ctx.my_subtree_id, + &predicate, + &blocker + ); + match blocker { + Sb::Inconsistent => {} // DROP + Sb::CouldntReadMsg(port) => { + assert!(self.ports.contains(&port)); + let channel_id = + r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; + log!( + &mut r_ctx.m_ctx.inner.logger, + "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}", + r_ctx.m_ctx.my_subtree_id, + channel_id, + &branch.inbox, + ); + if predicate.replace_assignment(channel_id, true) != Some(false) { + // don't rerun now. Rerun at next `sync_run` + + log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,); + branch.blocking_on = Some(port); + self.incomplete.insert(predicate, branch); + } else { + log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,); + } + // ELSE DROP + } + Sb::CouldntCheckFiring(port) => { + assert!(self.ports.contains(&port)); + let channel_id = + r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; + // split the branch! + let branch_f = branch.clone(); + let mut predicate_f = predicate.clone(); + if predicate_f.replace_assignment(channel_id, false).is_some() { + panic!("OI HANS QUERY FIRST!"); + } + assert!(predicate.replace_assignment(channel_id, true).is_none()); + to_run.push((predicate, branch)); + to_run.push((predicate_f, branch_f)); + } + Sb::SyncBlockEnd => { + let ControllerInner { logger, endpoint_exts, .. } = m_ctx.inner; + log!( + logger, + "~ ... ran {:?} reached SyncBlockEnd with pred {:?} ...", + m_ctx.my_subtree_id, + &predicate, + ); + // come up with the predicate for this local solution + + for port in self.ports.iter() { + let channel_id = endpoint_exts.get(*port).unwrap().info.channel_id; + let fired = + branch.inbox.contains_key(port) || branch.outbox.contains_key(port); + match predicate.query(channel_id) { + Some(true) => { + if !fired { + // This branch should have fired but didn't! + log!( + logger, + "~ ... ... should have fired {:?} and didn't! pruning!", + channel_id, + ); + continue 'to_run_loop; + } + } + Some(false) => { + if fired { + println!( + "pred {:#?} in {:#?} out {:#?}", + &predicate, + branch.inbox.get(port), + branch.outbox.get(port) + ); + panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had Some(false)!" ,channel_id) + } + } + None => { + predicate.replace_assignment(channel_id, false); + if fired { + println!( + "pred {:#?} in {:#?} out {:#?}", + &predicate, + branch.inbox.get(port), + branch.outbox.get(port) + ); + panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had None!" ,channel_id) + } + } + } + } + log!(logger, "~ ... ... and finished just fine!",); + m_ctx.solution_storage.submit_and_digest_subtree_solution( + &mut m_ctx.inner.logger, + m_ctx.my_subtree_id, + predicate.clone(), + ); + self.complete.insert(predicate, branch); + } + Sb::PutMsg(port, payload) => { + assert!(self.ports.contains(&port)); + let EndpointExt { info, endpoint } = + m_ctx.inner.endpoint_exts.get_mut(port).unwrap(); + if predicate.replace_assignment(info.channel_id, true) != Some(false) { + branch.outbox.insert(port, payload.clone()); + let msg = CommMsgContents::SendPayload { + payload_predicate: predicate.clone(), + payload, + } + .into_msg(m_ctx.inner.round_index); + log!( + &mut m_ctx.inner.logger, + "~ ... ... PolyP sending msg {:?} to {:?} ({:?}) now!", + &msg, + port, + (info.channel_id.controller_id, info.channel_id.channel_index), + ); + endpoint.send(msg)?; + to_run.push((predicate, branch)); + } + // ELSE DROP + } + } + } + // all in self.incomplete most recently returned Blocker::CouldntReadMsg + Ok(if self.incomplete.is_empty() { + if self.complete.is_empty() { + Srr::NoBranches + } else { + Srr::AllBranchesComplete + } + } else { + Srr::BlockingForRecv + }) + } + + pub(crate) fn poly_recv_run( + &mut self, + m_ctx: PolyPContext, + protocol_description: &ProtocolD, + port: PortId, + payload_predicate: Predicate, + payload: Payload, + ) -> Result { + // try exact match + + let to_run = if self.complete.contains_key(&payload_predicate) { + // exact match with stopped machine + + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run matched stopped machine exactly! nothing to do here", + ); + vec![] + } else if let Some(mut branch) = self.incomplete.remove(&payload_predicate) { + // exact match with running machine + + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run matched running machine exactly! pred is {:?}", + &payload_predicate + ); + branch.inbox.insert(port, payload); + if branch.blocking_on == Some(port) { + branch.blocking_on = None; + vec![(payload_predicate, branch)] + } else { + vec![] + } + } else { + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches", + ); + let mut incomplete2 = HashMap::<_, _>::default(); + let to_run = self + .incomplete + .drain() + .filter_map(|(old_predicate, mut branch)| { + use CommonSatResult as Csr; + match old_predicate.common_satisfier(&payload_predicate) { + Csr::FormerNotLatter | Csr::Equivalent => { + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run This branch is compatible unaltered! branch pred: {:?}", + &old_predicate + ); + // old_predicate COVERS the assumptions of payload_predicate + + if let Some(prev_payload) = branch.inbox.get(&port) { + // Incorrect to receive two distinct messages in same branch! + assert_eq!(prev_payload, &payload); + } + branch.inbox.insert(port, payload.clone()); + if branch.blocking_on == Some(port) { + // run. + branch.blocking_on = None; + Some((old_predicate, branch)) + } else { + // don't bother running. its awaiting something else + incomplete2.insert(old_predicate, branch); + None + } + } + Csr::New(new) => { + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run payloadpred {:?} and branchpred {:?} satisfied by new pred {:?}. FORKING", + &payload_predicate, + &old_predicate, + &new, + ); + // payload_predicate has new assumptions. FORK! + let mut payload_branch = branch.clone(); + if let Some(prev_payload) = payload_branch.inbox.get(&port) { + // Incorrect to receive two distinct messages in same branch! + assert_eq!(prev_payload, &payload); + } + payload_branch.inbox.insert(port, payload.clone()); + + // put the original back untouched + incomplete2.insert(old_predicate, branch); + if payload_branch.blocking_on == Some(port) { + // run the fork + payload_branch.blocking_on = None; + Some((new, payload_branch)) + } else { + // don't bother running. its awaiting something else + incomplete2.insert(new, payload_branch); + None + } + } + Csr::LatterNotFormer => { + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run payloadpred {:?} subsumes branch pred {:?}. FORKING", + &old_predicate, + &payload_predicate, + ); + // payload_predicate has new assumptions. FORK! + let mut payload_branch = branch.clone(); + if let Some(prev_payload) = payload_branch.inbox.get(&port) { + // Incorrect to receive two distinct messages in same branch! + assert_eq!(prev_payload, &payload); + } + payload_branch.inbox.insert(port, payload.clone()); + + // put the original back untouched + incomplete2.insert(old_predicate.clone(), branch); + if payload_branch.blocking_on == Some(port) { + // run the fork + payload_branch.blocking_on = None; + Some((payload_predicate.clone(), payload_branch)) + } else { + // don't bother running. its awaiting something else + incomplete2.insert(payload_predicate.clone(), payload_branch); + None + } + } + Csr::Nonexistant => { + log!( + &mut m_ctx.inner.logger, + "... poly_recv_run SKIPPING because branchpred={:?}. payloadpred={:?}", + &old_predicate, + &payload_predicate, + ); + // predicates contradict + incomplete2.insert(old_predicate, branch); + None + } + } + }) + .collect(); + std::mem::swap(&mut self.incomplete, &mut incomplete2); + to_run + }; + log!( + &mut m_ctx.inner.logger, + "... DONE FEEDING BRANCHES. {} branches to run!", + to_run.len(), + ); + self.poly_run_these_branches(m_ctx, protocol_description, to_run) + } + + pub(crate) fn choose_mono(&self, decision: &Predicate) -> Option { + self.complete + .iter() + .find(|(p, _)| decision.satisfies(p)) + .map(|(_, branch)| MonoP { state: branch.state.clone(), ports: self.ports.clone() }) + } +} + +impl PolyN { + pub fn sync_recv( + &mut self, + port: PortId, + logger: &mut String, + payload: Payload, + payload_predicate: Predicate, + solution_storage: &mut SolutionStorage, + ) { + let mut branches2: HashMap<_, _> = Default::default(); + for (old_predicate, mut branch) in self.branches.drain() { + use CommonSatResult as Csr; + let case = old_predicate.common_satisfier(&payload_predicate); + let mut report_if_solution = + |branch: &BranchN, pred: &Predicate, logger: &mut String| { + if branch.to_get.is_empty() { + log!(logger, "Native reporting solution with inbox {:#?}", &branch.gotten); + solution_storage.submit_and_digest_subtree_solution( + logger, + SubtreeId::PolyN, + pred.clone(), + ); + } + }; + log!( + logger, + "Feeding msg {:?} {:?} to native branch with pred {:?}. Predicate case {:?}", + &payload_predicate, + &payload, + &old_predicate, + &case + ); + match case { + Csr::Nonexistant => { /* skip branch */ } + Csr::LatterNotFormer | Csr::Equivalent => { + // Feed the message to this branch in-place. no need to modify pred. + if branch.to_get.remove(&port) { + branch.gotten.insert(port, payload.clone()); + report_if_solution(&branch, &old_predicate, logger); + } + } + Csr::FormerNotLatter => { + // create a new branch with the payload_predicate. + let mut forked = branch.clone(); + if forked.to_get.remove(&port) { + forked.gotten.insert(port, payload.clone()); + report_if_solution(&forked, &payload_predicate, logger); + branches2.insert(payload_predicate.clone(), forked); + } + } + Csr::New(new) => { + // create a new branch with the newly-created predicate + let mut forked = branch.clone(); + if forked.to_get.remove(&port) { + forked.gotten.insert(port, payload.clone()); + report_if_solution(&forked, &new, logger); + branches2.insert(new.clone(), forked); + } + } + } + // unlike PolyP machines, Native branches do NOT become inconsistent + branches2.insert(old_predicate, branch); + } + log!(logger, "Native now has branches {:#?}", &branches2); + std::mem::swap(&mut branches2, &mut self.branches); + } + + pub fn choose_mono(&self, decision: &Predicate) -> Option { + self.branches + .iter() + .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p)) + .map(|(_, branch)| { + let BranchN { gotten, sync_batch_index, .. } = branch.clone(); + MonoN { ports: self.ports.clone(), result: Some((sync_batch_index, gotten)) } + }) + } +}