diff --git a/Cargo.toml b/Cargo.toml index 7787c0f3a2a3d7b150d7156f5e66ab11b1a4ea34..a57be74c90133a042ec03459b3942a8d23c2227c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ lazy_static = "1.4.0" crate-type = ["cdylib"] [features] -default = ["ffi"] +default = ["ffi", "session_optimization"] ffi = [] # no feature dependencies -endpoint_logging = [] # see src/macros where a conditional check include endpoint logging \ No newline at end of file +endpoint_logging = [] # see src/macros.rs +session_optimization = [] # see src/runtime/setup.rs \ No newline at end of file diff --git a/src/common.rs b/src/common.rs index 2c816e862dd1bf006eccbb36e16c9d34dfdb78c7..feda52f86db093001f6840cd7c3ba370cdf87b61 100644 --- a/src/common.rs +++ b/src/common.rs @@ -41,7 +41,7 @@ pub struct Id { pub(crate) connector_id: ConnectorId, pub(crate) u32_suffix: PortSuffix, } -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct U32Stream { next: u32, } @@ -68,11 +68,6 @@ pub enum EndpointPolarity { Active, // calls connect() Passive, // calls bind() listen() accept() } -#[derive( - Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, -)] -pub(crate) struct FiringVar(pub(crate) PortId); - #[derive(Debug, Clone)] pub(crate) enum NonsyncBlocker { Inconsistent, @@ -165,11 +160,6 @@ impl Debug for PortId { write!(f, "ptID({}'{})", self.0.connector_id, self.0.u32_suffix) } } -impl Debug for FiringVar { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "fvID({}'{})", (self.0).0.connector_id, (self.0).0.u32_suffix) - } -} impl Debug for ProtoComponentId { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!(f, "pcID({}'{})", self.0.connector_id, self.0.u32_suffix) diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index da44b68e899e8ced9bd4c9888d6bb8cb2411a12a..779169ca54a1a4786de5439773d8066b78457ba1 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -156,6 +156,8 @@ impl Connector { comm.round_index ); + let mut spec_var_stream = cu.id_manager.new_spec_var_stream(); + // 1. run all proto components to Nonsync blockers let mut branching_proto_components = HashMap::::default(); @@ -224,23 +226,27 @@ impl Connector { "Translating {} native batches into branches...", comm.native_batches.len() ); + let native_branch_spec_var = spec_var_stream.next(); + log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var); let mut branching_native = BranchingNative { branches: Default::default() }; - 'native_branches: for (index, NativeBatch { to_get, to_put }) in - comm.native_batches.drain(..).enumerate() + 'native_branches: for ((native_branch, index), branch_spec_val) in + comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain()) { + let NativeBatch { to_get, to_put } = native_branch; let predicate = { - let mut predicate = Predicate::default(); + let mut predicate = + Predicate::default().inserted(native_branch_spec_var, branch_spec_val); // assign trues for ports that fire let firing_ports: HashSet = to_get.iter().chain(to_put.keys()).copied().collect(); for &port in to_get.iter().chain(to_put.keys()) { - let var = cu.port_info.firing_var_for(port); - predicate.assigned.insert(var, true); + let var = cu.port_info.spec_var_for(port); + predicate.assigned.insert(var, SpecVal::FIRING); } // assign falses for silent ports for &port in cu.native_ports.difference(&firing_ports) { - let var = cu.port_info.firing_var_for(port); - if let Some(true) = predicate.assigned.insert(var, false) { + let var = cu.port_info.spec_var_for(port); + if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { log!(cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; } @@ -269,8 +275,9 @@ impl Connector { ); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; - if let Some(existing) = branching_native.branches.insert(predicate, branch) { - return Err(Se::IndistinguishableBatches([index, existing.index])); + if let Some(_existing) = branching_native.branches.insert(predicate, branch) { + unreachable!() + // return Err(Se::IndistinguishableBatches([index, existing.index])); } } // restore the invariant @@ -645,7 +652,7 @@ impl BranchingNative { for (predicate, mut branch) in draining.drain() { log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it - let var = cu.port_info.firing_var_for(getter); + let var = cu.port_info.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); @@ -665,7 +672,7 @@ impl BranchingNative { ); } }; - if predicate.query(var) != Some(true) { + if predicate.query(var) != Some(SpecVal::FIRING) { // optimization. Don't bother trying this branch log!( cu.logger, @@ -675,9 +682,9 @@ impl BranchingNative { finished.insert(predicate, branch); continue; } - use CommonSatResult as Csr; - match predicate.common_satisfier(&send_payload_msg.predicate) { - Csr::Nonexistant => { + use AllMapperResult as Amr; + match predicate.all_mapper(&send_payload_msg.predicate) { + Amr::Nonexistant => { // this branch does not receive the message log!( cu.logger, @@ -686,13 +693,13 @@ impl BranchingNative { ); finished.insert(predicate, branch); } - Csr::Equivalent | Csr::FormerNotLatter => { + Amr::Equivalent | Amr::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); log!(cu.logger, "branch pred covers it! Accept the msg"); finished.insert(predicate, branch); } - Csr::LatterNotFormer => { + Amr::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); @@ -706,7 +713,7 @@ impl BranchingNative { finished.insert(predicate, branch); finished.insert(predicate2, branch2); } - Csr::New(predicate2) => { + Amr::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); @@ -729,7 +736,7 @@ impl BranchingNative { self.branches.keys() ); for (branch_predicate, branch) in self.branches { - if branch.to_get.is_empty() && solution_predicate.satisfies(&branch_predicate) { + if branch.to_get.is_empty() && solution_predicate.consistent_with(&branch_predicate) { let NativeBranch { index, gotten, .. } = branch; log!(logger, "Collapsed native has gotten {:?}", &gotten); return RoundOk { batch_index: index, gotten }; @@ -775,8 +782,8 @@ impl BranchingProtoComponent { B::SyncBlockEnd => { // make concrete all variables for &port in ports.iter() { - let var = cu.port_info.firing_var_for(port); - predicate.assigned.entry(var).or_insert(false); + let var = cu.port_info.spec_var_for(port); + predicate.assigned.entry(var).or_insert(SpecVal::SILENT); } // submit solution for this component solution_storage.submit_and_digest_subtree_solution( @@ -795,19 +802,19 @@ impl BranchingProtoComponent { } B::CouldntCheckFiring(port) => { // sanity check - let var = cu.port_info.firing_var_for(port); + let var = cu.port_info.spec_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" - drainer.add_input(predicate.clone().inserted(var, false), branch.clone()); - drainer.add_input(predicate.inserted(var, true), branch); + drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); + drainer.add_input(predicate.inserted(var, SpecVal::FIRING), branch); } B::PutMsg(putter, payload) => { // sanity check assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter)); // overwrite assignment - let var = cu.port_info.firing_var_for(putter); - let was = predicate.assigned.insert(var, true); - if was == Some(false) { + let var = cu.port_info.spec_var_for(putter); + let was = predicate.assigned.insert(var, SpecVal::FIRING); + if was == Some(SpecVal::SILENT) { log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); @@ -851,21 +858,21 @@ impl BranchingProtoComponent { blocked.insert(predicate, branch); continue; } - use CommonSatResult as Csr; + use AllMapperResult as Amr; log!(logger, "visiting branch with pred {:?}", &predicate); - match predicate.common_satisfier(&send_payload_msg.predicate) { - Csr::Nonexistant => { + match predicate.all_mapper(&send_payload_msg.predicate) { + Amr::Nonexistant => { // this branch does not receive the message log!(logger, "skipping branch"); blocked.insert(predicate, branch); } - Csr::Equivalent | Csr::FormerNotLatter => { + Amr::Equivalent | Amr::FormerNotLatter => { // retain the existing predicate, but add this payload log!(logger, "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); unblocked.insert(predicate, branch); } - Csr::LatterNotFormer => { + Amr::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched log!(logger, "Forking this branch, giving it the predicate of the msg"); let mut branch2 = branch.clone(); @@ -874,7 +881,7 @@ impl BranchingProtoComponent { blocked.insert(predicate, branch); unblocked.insert(predicate2, branch2); } - Csr::New(predicate2) => { + Amr::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched log!(logger, "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); @@ -904,7 +911,7 @@ impl BranchingProtoComponent { fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches { - if branch.ended && branch_predicate.satisfies(solution_predicate) { + if branch.ended && solution_predicate.consistent_with(&branch_predicate) { let ProtoComponentBranch { state, .. } = branch; return ProtoComponent { state, ports }; } @@ -1034,8 +1041,8 @@ impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> { } impl SyncProtoContext<'_> { pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.port_info.firing_var_for(port); - self.predicate.query(var) + let var = self.port_info.spec_var_for(port); + self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { self.inbox.get(&port) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index ea26830c70aa06f220aee2c86796a27c151ff5b1..cd2e179e27f9ca79d96f3972d9121b55177890a9 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -45,6 +45,15 @@ pub(crate) struct SyncProtoContext<'a> { port_info: &'a PortInfo, inbox: &'a HashMap, } + +#[derive( + Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] +struct SpecVar(PortId); +#[derive( + Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] +struct SpecVal(u16); #[derive(Debug)] struct RoundOk { batch_index: usize, @@ -115,7 +124,7 @@ struct SendPayloadMsg { payload: Payload, } #[derive(Debug, PartialEq)] -enum CommonSatResult { +enum AllMapperResult { FormerNotLatter, LatterNotFormer, Equivalent, @@ -153,6 +162,11 @@ struct IdManager { proto_component_suffix_stream: U32Stream, } #[derive(Debug)] +struct SpecVarStream { + connector_id: ConnectorId, + port_suffix_stream: U32Stream, +} +#[derive(Debug)] struct EndpointManager { // invariants: // 1. endpoint N is registered READ | WRITE with poller @@ -194,7 +208,7 @@ enum ConnectorPhased { } #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] struct Predicate { - assigned: BTreeMap, + assigned: BTreeMap, } #[derive(Debug, Default)] struct NativeBatch { @@ -232,13 +246,21 @@ impl VecSet { } } impl PortInfo { - fn firing_var_for(&self, port: PortId) -> FiringVar { - FiringVar(match self.polarities.get(&port).unwrap() { + fn spec_var_for(&self, port: PortId) -> SpecVar { + SpecVar(match self.polarities.get(&port).unwrap() { Getter => port, Putter => *self.peers.get(&port).unwrap(), }) } } +impl SpecVarStream { + fn next(&mut self) -> SpecVar { + let phantom_port: PortId = + Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() } + .into(); + SpecVar(phantom_port) + } +} impl IdManager { fn new(connector_id: ConnectorId) -> Self { Self { @@ -247,6 +269,12 @@ impl IdManager { proto_component_suffix_stream: Default::default(), } } + fn new_spec_var_stream(&self) -> SpecVarStream { + SpecVarStream { + connector_id: self.connector_id, + port_suffix_stream: self.port_suffix_stream.clone(), + } + } fn new_port_id(&mut self) -> PortId { Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() } @@ -338,28 +366,19 @@ impl Connector { } impl Predicate { #[inline] - pub fn inserted(mut self, k: FiringVar, v: bool) -> Self { + pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self { self.assigned.insert(k, v); self } // returns true IFF self.unify would return Equivalent OR FormerNotLatter - pub fn satisfies(&self, other: &Self) -> bool { - let mut s_it = self.assigned.iter(); - let mut s = if let Some(s) = s_it.next() { - s - } else { - return other.assigned.is_empty(); - }; - for (oid, ob) in other.assigned.iter() { - while s.0 < oid { - s = if let Some(s) = s_it.next() { - s - } else { - return false; - }; - } - if s.0 > oid || s.1 != ob { - return false; + pub fn consistent_with(&self, other: &Self) -> bool { + let [larger, smaller] = + if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] }; + + for (var, val) in smaller.assigned.iter() { + match larger.assigned.get(var) { + Some(val2) if val2 != val => return false, + _ => {} } } true @@ -371,8 +390,8 @@ impl Predicate { /// If the resulting predicate is equivlanet to self, other, or both, /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. /// otherwise New(N) is returned. - fn common_satisfier(&self, other: &Self) -> CommonSatResult { - use CommonSatResult as Csr; + fn all_mapper(&self, other: &Self) -> AllMapperResult { + use AllMapperResult as Amr; // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; let [mut s, mut o] = [s_it.next(), o_it.next()]; @@ -403,7 +422,7 @@ impl Predicate { } else if sb != ob { assert_eq!(sid, oid); // both predicates assign the variable but differ on the value - return Csr::Nonexistant; + return Amr::Nonexistant; } else { // both predicates assign the variable to the same value s = s_it.next(); @@ -414,9 +433,9 @@ impl Predicate { } // Observed zero inconsistencies. A unified predicate exists... match [s_not_o.is_empty(), o_not_s.is_empty()] { - [true, true] => Csr::Equivalent, // ... equivalent to both. - [false, true] => Csr::FormerNotLatter, // ... equivalent to self. - [true, false] => Csr::LatterNotFormer, // ... equivalent to other. + [true, true] => Amr::Equivalent, // ... equivalent to both. + [false, true] => Amr::FormerNotLatter, // ... equivalent to self. + [true, false] => Amr::LatterNotFormer, // ... equivalent to other. [false, false] => { // ... which is the union of the predicates' assignments but // is equivalent to neither self nor other. @@ -424,7 +443,7 @@ impl Predicate { for (&id, &b) in o_not_s { new.assigned.insert(id, b); } - Csr::New(new) + Amr::New(new) } } } @@ -438,7 +457,7 @@ impl Predicate { } Some(res) } - pub fn query(&self, var: FiringVar) -> Option { + pub fn query(&self, var: SpecVar) -> Option { self.assigned.get(&var).copied() } } @@ -449,23 +468,7 @@ impl Debug for VecSet { } impl Debug for Predicate { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - struct MySet<'a>(&'a Predicate, bool); - impl Debug for MySet<'_> { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - let iter = self.0.assigned.iter().filter_map(|(port, &firing)| { - if firing == self.1 { - Some(port) - } else { - None - } - }); - f.debug_set().entries(iter).finish() - } - } - f.debug_struct("Predicate") - .field("Trues", &MySet(self, true)) - .field("Falses", &MySet(self, false)) - .finish() + f.debug_tuple("Predicate").field(&self.assigned).finish() } } impl serde::Serialize for SerdeProtocolDescription { @@ -486,40 +489,24 @@ impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription { Ok(Self(Arc::new(inner))) } } - -#[test] -fn bincode_serde() { - let mut b = Vec::with_capacity(64); - use bincode::config::Options; - let opt = bincode::config::DefaultOptions::default(); - opt.serialize_into(&mut b, &Decision::Failure).unwrap(); - println!("failure {:x?}", b); - b.clear(); - - opt.serialize_into(&mut b, &CommMsgContents::Suggest { suggestion: Decision::Failure }) - .unwrap(); - println!("decision {:x?}", b); - b.clear(); - - opt.serialize_into( - &mut b, - &CommMsg { - round_index: 4, - contents: CommMsgContents::Suggest { suggestion: Decision::Failure }, - }, - ) - .unwrap(); - println!("commmsg {:x?}", b); - b.clear(); - - opt.serialize_into( - &mut b, - &Msg::CommMsg(CommMsg { - round_index: 4, - contents: CommMsgContents::Suggest { suggestion: Decision::Failure }, - }), - ) - .unwrap(); - println!("msg {:x?}", b); - b.clear(); +impl Debug for SpecVar { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_tuple("vrID").field(&self.0).finish() + } +} +impl SpecVal { + const FIRING: Self = SpecVal(1); + const SILENT: Self = SpecVal(0); + fn is_firing(self) -> bool { + self == Self::FIRING + // all else treated as SILENT + } + fn iter_domain() -> impl Iterator { + (0..).map(SpecVal) + } +} +impl Debug for SpecVal { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + self.0.fmt(f) + } } diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index ab12b429d02be2e6aa815d43612319c5744d0efe..6822e4b907b0cbdb2719e028e334fa5ceaef360b 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -87,7 +87,9 @@ impl Connector { native_batches: vec![Default::default()], round_result: Ok(None), }; - session_optimize(cu, &mut comm, deadline)?; + if cfg!(feature = "session_optimization") { + session_optimize(cu, &mut comm, deadline)?; + } log!(cu.logger, "connect() finished. setup phase complete"); self.phased = ConnectorPhased::Communication(Box::new(comm)); Ok(()) diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 6f8ff25182f6097b3016a470abf682bcb706f97f..111b78c27c30c090af0255c18085ec52122ab3c5 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -577,3 +577,12 @@ fn together() { }) .unwrap(); } + +#[test] +fn native_batch_distinguish() { + let test_log_path = Path::new("./logs/native_batch_distinguish"); + let mut c = file_logged_connector(0, test_log_path); + c.connect(Some(Duration::from_secs(1))).unwrap(); + c.next_batch().unwrap(); + c.sync(Some(Duration::from_secs(3))).unwrap(); +}