diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index d47ad39644c7b60806e31e35bdf2f692227d0704..0f8cd88f76b65de172c197a066c6308719cd6230 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -3,37 +3,37 @@ use crate::runtime::{endpoint::*, *}; #[derive(Debug, Clone)] pub(crate) struct MonoN { - pub ekeys: HashSet, - pub result: Option<(usize, HashMap)>, + pub ports: HashSet, + pub result: Option<(usize, HashMap)>, } #[derive(Debug)] pub(crate) struct PolyN { - pub ekeys: HashSet, + pub ports: HashSet, pub branches: HashMap, } #[derive(Debug, Clone)] pub(crate) struct BranchN { - pub to_get: HashSet, - pub gotten: HashMap, + pub to_get: HashSet, + pub gotten: HashMap, pub sync_batch_index: usize, } #[derive(Debug, Clone)] pub struct MonoP { pub state: ProtocolS, - pub ekeys: HashSet, + pub ports: HashSet, } #[derive(Debug)] pub(crate) struct PolyP { pub incomplete: HashMap, pub complete: HashMap, - pub ekeys: HashSet, + pub ports: HashSet, } #[derive(Debug, Clone)] pub(crate) struct BranchP { - pub blocking_on: Option, - pub outbox: HashMap, - pub inbox: HashMap, + pub blocking_on: Option, + pub outbox: HashMap, + pub inbox: HashMap, pub state: ProtocolS, } @@ -60,7 +60,7 @@ impl PolyP { 'to_run_loop: while let Some((mut predicate, mut branch)) = to_run.pop() { let mut r_ctx = BranchPContext { m_ctx: m_ctx.reborrow(), - ekeys: &self.ekeys, + ports: &self.ports, predicate: &predicate, inbox: &branch.inbox, }; @@ -75,10 +75,10 @@ impl PolyP { ); match blocker { Sb::Inconsistent => {} // DROP - Sb::CouldntReadMsg(ekey) => { - assert!(self.ekeys.contains(&ekey)); + Sb::CouldntReadMsg(port) => { + assert!(self.ports.contains(&port)); let channel_id = - r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; log!( &mut r_ctx.m_ctx.inner.logger, "~ ... {:?} couldnt read msg for port {:?}. has inbox {:?}", @@ -90,17 +90,17 @@ impl PolyP { // don't rerun now. Rerun at next `sync_run` log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,); - branch.blocking_on = Some(ekey); + branch.blocking_on = Some(port); self.incomplete.insert(predicate, branch); } else { log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,); } // ELSE DROP } - Sb::CouldntCheckFiring(ekey) => { - assert!(self.ekeys.contains(&ekey)); + Sb::CouldntCheckFiring(port) => { + assert!(self.ports.contains(&port)); let channel_id = - r_ctx.m_ctx.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; + r_ctx.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; // split the branch! let branch_f = branch.clone(); let mut predicate_f = predicate.clone(); @@ -121,10 +121,10 @@ impl PolyP { ); // come up with the predicate for this local solution - for ekey in self.ekeys.iter() { - let channel_id = endpoint_exts.get(*ekey).unwrap().info.channel_id; + for port in self.ports.iter() { + let channel_id = endpoint_exts.get(*port).unwrap().info.channel_id; let fired = - branch.inbox.contains_key(ekey) || branch.outbox.contains_key(ekey); + branch.inbox.contains_key(port) || branch.outbox.contains_key(port); match predicate.query(channel_id) { Some(true) => { if !fired { @@ -142,8 +142,8 @@ impl PolyP { println!( "pred {:#?} in {:#?} out {:#?}", &predicate, - branch.inbox.get(ekey), - branch.outbox.get(ekey) + branch.inbox.get(port), + branch.outbox.get(port) ); panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had Some(false)!" ,channel_id) } @@ -154,8 +154,8 @@ impl PolyP { println!( "pred {:#?} in {:#?} out {:#?}", &predicate, - branch.inbox.get(ekey), - branch.outbox.get(ekey) + branch.inbox.get(port), + branch.outbox.get(port) ); panic!("channel_id {:?} fired (based on outbox/inbox) but the predicate had None!" ,channel_id) } @@ -170,12 +170,12 @@ impl PolyP { ); self.complete.insert(predicate, branch); } - Sb::PutMsg(ekey, payload) => { - assert!(self.ekeys.contains(&ekey)); + Sb::PutMsg(port, payload) => { + assert!(self.ports.contains(&port)); let EndpointExt { info, endpoint } = - m_ctx.inner.endpoint_exts.get_mut(ekey).unwrap(); + m_ctx.inner.endpoint_exts.get_mut(port).unwrap(); if predicate.replace_assignment(info.channel_id, true) != Some(false) { - branch.outbox.insert(ekey, payload.clone()); + branch.outbox.insert(port, payload.clone()); let msg = CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload, @@ -185,7 +185,7 @@ impl PolyP { &mut m_ctx.inner.logger, "~ ... ... PolyP sending msg {:?} to {:?} ({:?}) now!", &msg, - ekey, + port, (info.channel_id.controller_id, info.channel_id.channel_index), ); endpoint.send(msg)?; @@ -211,7 +211,7 @@ impl PolyP { &mut self, m_ctx: PolyPContext, protocol_description: &ProtocolD, - ekey: Key, + port: Port, payload_predicate: Predicate, payload: Payload, ) -> Result { @@ -233,8 +233,8 @@ impl PolyP { "... poly_recv_run matched running machine exactly! pred is {:?}", &payload_predicate ); - branch.inbox.insert(ekey, payload); - if branch.blocking_on == Some(ekey) { + branch.inbox.insert(port, payload); + if branch.blocking_on == Some(port) { branch.blocking_on = None; vec![(payload_predicate, branch)] } else { @@ -260,12 +260,12 @@ impl PolyP { ); // old_predicate COVERS the assumptions of payload_predicate - if let Some(prev_payload) = branch.inbox.get(&ekey) { + if let Some(prev_payload) = branch.inbox.get(&port) { // Incorrect to receive two distinct messages in same branch! assert_eq!(prev_payload, &payload); } - branch.inbox.insert(ekey, payload.clone()); - if branch.blocking_on == Some(ekey) { + branch.inbox.insert(port, payload.clone()); + if branch.blocking_on == Some(port) { // run. branch.blocking_on = None; Some((old_predicate, branch)) @@ -285,15 +285,15 @@ impl PolyP { ); // payload_predicate has new assumptions. FORK! let mut payload_branch = branch.clone(); - if let Some(prev_payload) = payload_branch.inbox.get(&ekey) { + if let Some(prev_payload) = payload_branch.inbox.get(&port) { // Incorrect to receive two distinct messages in same branch! assert_eq!(prev_payload, &payload); } - payload_branch.inbox.insert(ekey, payload.clone()); + payload_branch.inbox.insert(port, payload.clone()); // put the original back untouched incomplete2.insert(old_predicate, branch); - if payload_branch.blocking_on == Some(ekey) { + if payload_branch.blocking_on == Some(port) { // run the fork payload_branch.blocking_on = None; Some((new, payload_branch)) @@ -312,15 +312,15 @@ impl PolyP { ); // payload_predicate has new assumptions. FORK! let mut payload_branch = branch.clone(); - if let Some(prev_payload) = payload_branch.inbox.get(&ekey) { + if let Some(prev_payload) = payload_branch.inbox.get(&port) { // Incorrect to receive two distinct messages in same branch! assert_eq!(prev_payload, &payload); } - payload_branch.inbox.insert(ekey, payload.clone()); + payload_branch.inbox.insert(port, payload.clone()); // put the original back untouched incomplete2.insert(old_predicate.clone(), branch); - if payload_branch.blocking_on == Some(ekey) { + if payload_branch.blocking_on == Some(port) { // run the fork payload_branch.blocking_on = None; Some((payload_predicate.clone(), payload_branch)) @@ -359,14 +359,14 @@ impl PolyP { self.complete .iter() .find(|(p, _)| decision.satisfies(p)) - .map(|(_, branch)| MonoP { state: branch.state.clone(), ekeys: self.ekeys.clone() }) + .map(|(_, branch)| MonoP { state: branch.state.clone(), ports: self.ports.clone() }) } } impl PolyN { pub fn sync_recv( &mut self, - ekey: Key, + port: Port, logger: &mut String, payload: Payload, payload_predicate: Predicate, @@ -399,16 +399,16 @@ impl PolyN { Csr::Nonexistant => { /* skip branch */ } Csr::LatterNotFormer | Csr::Equivalent => { // Feed the message to this branch in-place. no need to modify pred. - if branch.to_get.remove(&ekey) { - branch.gotten.insert(ekey, payload.clone()); + if branch.to_get.remove(&port) { + branch.gotten.insert(port, payload.clone()); report_if_solution(&branch, &old_predicate, logger); } } Csr::FormerNotLatter => { // create a new branch with the payload_predicate. let mut forked = branch.clone(); - if forked.to_get.remove(&ekey) { - forked.gotten.insert(ekey, payload.clone()); + if forked.to_get.remove(&port) { + forked.gotten.insert(port, payload.clone()); report_if_solution(&forked, &payload_predicate, logger); branches2.insert(payload_predicate.clone(), forked); } @@ -416,8 +416,8 @@ impl PolyN { Csr::New(new) => { // create a new branch with the newly-created predicate let mut forked = branch.clone(); - if forked.to_get.remove(&ekey) { - forked.gotten.insert(ekey, payload.clone()); + if forked.to_get.remove(&port) { + forked.gotten.insert(port, payload.clone()); report_if_solution(&forked, &new, logger); branches2.insert(new.clone(), forked); } @@ -436,7 +436,7 @@ impl PolyN { .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p)) .map(|(_, branch)| { let BranchN { gotten, sync_batch_index, .. } = branch.clone(); - MonoN { ekeys: self.ekeys.clone(), result: Some((sync_batch_index, gotten)) } + MonoN { ports: self.ports.clone(), result: Some((sync_batch_index, gotten)) } }) } }