From 800041f1651442a594befdcc7de7162375ad9959 2020-02-12 18:08:48 From: Christopher Esterhuyse Date: 2020-02-12 18:08:48 Subject: [PATCH] playing around with new predicate=>state,payload storages. toward faster querying but also toward deviation detection by way of intelligible error handling --- diff --git a/src/protocol/eval.rs b/src/protocol/eval.rs index 734b67d6e41e1509550741ab3be0b9f20e9c3467..edf19cb1bb493fb8b200685d2b84608eed005728 100644 --- a/src/protocol/eval.rs +++ b/src/protocol/eval.rs @@ -253,9 +253,7 @@ impl Value { (Value::Int(IntValue(s)), Value::Short(ShortValue(o))) => { Value::Int(IntValue(*s + *o as i32)) } - (Value::Int(IntValue(s)), Value::Int(IntValue(o))) => { - Value::Int(IntValue(*s + *o)) - } + (Value::Int(IntValue(s)), Value::Int(IntValue(o))) => Value::Int(IntValue(*s + *o)), (Value::Int(IntValue(s)), Value::Long(LongValue(o))) => { Value::Long(LongValue(*s as i64 + *o)) } @@ -306,9 +304,7 @@ impl Value { (Value::Int(IntValue(s)), Value::Short(ShortValue(o))) => { Value::Int(IntValue(*s - *o as i32)) } - (Value::Int(IntValue(s)), Value::Int(IntValue(o))) => { - Value::Int(IntValue(*s - *o)) - } + (Value::Int(IntValue(s)), Value::Int(IntValue(o))) => Value::Int(IntValue(*s - *o)), (Value::Int(IntValue(s)), Value::Long(LongValue(o))) => { Value::Long(LongValue(*s as i64 - *o)) } @@ -359,9 +355,7 @@ impl Value { (Value::Int(IntValue(s)), Value::Short(ShortValue(o))) => { Value::Int(IntValue(*s % *o as i32)) } - (Value::Int(IntValue(s)), Value::Int(IntValue(o))) => { - Value::Int(IntValue(*s % *o)) - } + (Value::Int(IntValue(s)), Value::Int(IntValue(o))) => Value::Int(IntValue(*s % *o)), (Value::Int(IntValue(s)), Value::Long(LongValue(o))) => { Value::Long(LongValue(*s as i64 % *o)) } @@ -1447,7 +1441,10 @@ impl Store { match &h[rexpr] { Expression::Variable(var) => { let var = var.declaration.unwrap(); - let value = self.map.get(&var).expect(&format!("Uninitialized variable {:?}", h[h[var].identifier()])); + let value = self + .map + .get(&var) + .expect(&format!("Uninitialized variable {:?}", h[h[var].identifier()])); Ok(value.clone()) } Expression::Indexing(indexing) => { @@ -1519,7 +1516,7 @@ impl Store { match expr.operation { BinaryOperator::LogicalAnd => { if left.as_boolean().0 == false { - return Ok(left) + return Ok(left); } right = self.eval(h, ctx, expr.right)?; right.as_boolean(); // panics if not a boolean @@ -1527,7 +1524,7 @@ impl Store { } BinaryOperator::LogicalOr => { if left.as_boolean().0 == true { - return Ok(left) + return Ok(left); } right = self.eval(h, ctx, expr.right)?; right.as_boolean(); // panics if not a boolean @@ -1577,7 +1574,7 @@ impl Store { elements.push(self.eval(h, ctx, elem)?); } todo!() - }, + } Expression::Constant(expr) => Ok(Value::from_constant(&expr.value)), Expression::Call(expr) => match expr.method { Method::Create => { @@ -1670,7 +1667,7 @@ impl Prompt { // Store the values in the declared variables self.store.initialize(h, stmt.from.upcast(), from); self.store.initialize(h, stmt.to.upcast(), to); - }, + } } // Continue to next statement self.position = stmt.next(); diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index f43bf2ce7f3d8dad914a1af7dffddfb9e4a36143..c9ea8944f8b1ae6e53be625f437cc000cd5a16b2 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -161,6 +161,13 @@ impl PolyP { payload, } .into_msg(m_ctx.inner.round_index); + log!( + &mut m_ctx.inner.logger, + "~ ... ... PolyP sending msg {:?} to {:?} ({:?}) now!", + &msg, + ekey, + (info.channel_id.controller_id, info.channel_id.channel_index), + ); endpoint.send(msg)?; to_run.push((predicate, branch)); } @@ -228,8 +235,12 @@ impl PolyP { &old_predicate ); // old_predicate COVERS the assumptions of payload_predicate - let was = branch.inbox.insert(ekey, payload.clone()); - assert!(was.is_none()); // INBOX MUST BE EMPTY! + + if let Some(prev_payload) = branch.inbox.get(&ekey) { + // Incorrect to receive two distinct messages in same branch! + assert_eq!(prev_payload, &payload); + } + branch.inbox.insert(ekey, payload.clone()); Some((old_predicate, branch)) } Csr::New(new) => { @@ -242,8 +253,11 @@ impl PolyP { ); // payload_predicate has new assumptions. FORK! let mut payload_branch = branch.clone(); - let was = payload_branch.inbox.insert(ekey, payload.clone()); - assert!(was.is_none()); // INBOX MUST BE EMPTY! + if let Some(prev_payload) = payload_branch.inbox.get(&ekey) { + // Incorrect to receive two distinct messages in same branch! + assert_eq!(prev_payload, &payload); + } + payload_branch.inbox.insert(ekey, payload.clone()); // put the original back untouched incomplete2.insert(old_predicate, branch); @@ -258,8 +272,11 @@ impl PolyP { ); // payload_predicate has new assumptions. FORK! let mut payload_branch = branch.clone(); - let was = payload_branch.inbox.insert(ekey, payload.clone()); - assert!(was.is_none()); // INBOX MUST BE EMPTY! + if let Some(prev_payload) = payload_branch.inbox.get(&ekey) { + // Incorrect to receive two distinct messages in same branch! + assert_eq!(prev_payload, &payload); + } + payload_branch.inbox.insert(ekey, payload.clone()); // put the original back untouched incomplete2.insert(old_predicate, branch); diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 81b9d1b3a649aeae390688ec05ae089980077787..a9971f53b15c21af0e7fd62fcc3d2fa83f32a875 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -282,9 +282,10 @@ impl Controller { } // 4. Receive incoming messages until the DECISION is made - log!(&mut self.inner.logger, "No decision yet. Time to recv messages"); + log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); self.undelay_all(); 'recv_loop: loop { + log!(&mut self.inner.logger, "`POLLING`..."); let received = self.recv(deadline)?.ok_or_else(|| { log!( &mut self.inner.logger, @@ -293,9 +294,9 @@ impl Controller { ); SyncErr::Timeout })?; + log!(&mut self.inner.logger, "::: message {:?}...", &received); let current_content = match received.msg { Msg::SetupMsg(_) => { - log!(&mut self.inner.logger, "recvd message {:?} and its SETUP :(", &received); // This occurs in the event the connector was malformed during connect() return Err(SyncErr::UnexpectedSetupMsg); } @@ -303,7 +304,7 @@ impl Controller { if round_index < self.inner.round_index => { // Old message! Can safely discard - log!(&mut self.inner.logger, "recvd message {:?} and its OLD! :(", &received); + log!(&mut self.inner.logger, "...and its OLD! :("); drop(received); continue 'recv_loop; } @@ -311,19 +312,14 @@ impl Controller { if round_index > self.inner.round_index => { // Message from a next round. Keep for later! - log!( - &mut self.inner.logger, - "ecvd message {:?} and its for later. DELAY! :(", - &received - ); + log!(&mut self.inner.logger, "... DELAY! :("); self.delay(received); continue 'recv_loop; } Msg::CommMsg(CommMsg { contents, round_index }) => { log!( &mut self.inner.logger, - "recvd a round-appropriate CommMsg {:?} with key {:?}", - &contents, + "... its a round-appropriate CommMsg with key {:?}", received.recipient ); assert_eq!(round_index, self.inner.round_index); diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index b67ca1439c34d04faf9a69f230f6cd8240d9ff7f..142c00a4bf23e9f9d9e89b5f4a325f37ace80bde 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -6,7 +6,8 @@ pub(crate) mod communication; pub(crate) mod connector; pub(crate) mod endpoint; pub mod errors; -mod predicate; // TODO later +// mod predicate; // TODO later +mod polyp; mod serde; pub(crate) mod setup; @@ -234,7 +235,7 @@ impl Debug for SolutionStorage { f.pad("Solutions: [")?; for (subtree_id, &index) in self.subtree_id_to_index.iter() { let sols = &self.subtree_solutions[index]; - f.write_fmt(format_args!("{:?} => {:?}, ", subtree_id, sols))?; + f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; } f.pad("]") } diff --git a/src/runtime/polyp.rs b/src/runtime/polyp.rs new file mode 100644 index 0000000000000000000000000000000000000000..0a343d9d92022e14fbac4b1b9bdeea322bc8e4c9 --- /dev/null +++ b/src/runtime/polyp.rs @@ -0,0 +1,279 @@ +use crate::common::*; +use crate::runtime::{Predicate, ProtocolS}; +use core::ops::{Index, IndexMut}; +use std::collections::HashMap; +use std::num::NonZeroU32; + +// struct SpeculationBranch { +// inbox: HashMap, +// outbox: HashMap, +// inner: SpeculationBranchInner, +// known: HashMap, +// } +// enum SpeculationBranchInner { +// Leaf(ProtocolS), + +// // invariant: channel_id branching is redundantly represented by true_false branches' known assignments +// // => true_false[0].known[channel_id] == Some(true) +// // => true_false[1].known[channel_id] == Some(false) +// Fork { channel_id: ChannelId, true_false: Box<[SpeculationBranch; 2]> }, +// } + +// impl SpeculationBranch { +// fn new_tree(init: ProtocolS) -> Self { +// SpeculationBranch { +// inbox: Default::default(), +// outbox: Default::default(), +// known: Default::default(), +// inner: SpeculationBranchInner::Leaf(init), +// } +// } + +// fn feed_msg( +// &mut self, +// ekey: Key, +// payload: &Payload, +// predicate: &Predicate, +// pred: Option, +// ) { +// use SpeculationBranchInner as Sbi; +// let next_pred = Some(Pred { known: &self.known, prev: pred.as_ref() }); +// match &mut self.inner { +// Sbi::Leaf(_state) => { +// if self.inbox.insert(ekey, payload.clone()).is_none() { +// // run this machine +// } +// } +// Sbi::Fork { channel_id, true_false } => match predicate.query(*channel_id) { +// Some(true) => true_false[0].feed_msg(ekey, payload, predicate, next_pred), // feed true +// Some(false) => true_false[1].feed_msg(ekey, payload, predicate, next_pred), // feed false +// None => { +// // feed to both true and false branches +// for x in true_false.iter_mut() { +// x.feed_msg(ekey, payload, predicate, next_pred); +// } +// } +// }, +// } +// } +// } + +// #[derive(Copy, Clone)] +// struct Pred<'a> { +// known: &'a HashMap, +// prev: Option<&'a Pred<'a>>, +// } + +struct Branch { + state: Option, + speculation: Option, +} +struct Speculation { + on: ChannelId, + t: Option, + f: Option, +} + +struct Tree { + branches: Vec, // invariant: non-empty. root at index 0 + states: Vec, +} +impl Tree { + /// determine where in the tree the given message should be inserted (based on the predicate). + /// run all machines + fn feed_and_run(&mut self, predicate: Predicate, payload: &Payload) { + let q = Queryable::new(&predicate); + let mut qs = QueryableSubset::new(&q); + self.branches[0].feed_and_run(payload, &q, &mut qs); + } +} + +struct Queryable(Vec<(ChannelId, bool)>); +impl Queryable { + fn new(predicate: &Predicate) -> Self { + let mut vec: Vec<_> = predicate.assigned.iter().map(|(&k, &v)| (k, v)).collect(); + vec.sort_by(|(a, _), (b, _)| a.cmp(b)); + Self(vec) + } + fn query(&self, channel_id: ChannelId) -> Option<(usize, bool)> { + self.0 + .binary_search_by(|(cid, _)| cid.cmp(&channel_id)) + .ok() + .map(|index| (index, self.0[index].1)) + } +} +struct QueryableSubset { + buf: Vec, + prefix_end: usize, +} +impl QueryableSubset { + fn new(q: &Queryable) -> Self { + let prefix_end = q.0.len(); + Self { buf: (0..prefix_end).collect(), prefix_end } + } + fn remove(&mut self, at: usize) { + self.prefix_end -= 1; + self.buf.swap(self.prefix_end, at); + } + fn undo_remove(&mut self, at: usize) { + self.buf.swap(self.prefix_end, at); + self.prefix_end += 1; + } + fn iter_q<'a: 'b, 'b>( + &'a self, + q: &'b Queryable, + ) -> impl Iterator { + self.buf[..self.prefix_end].iter().map(move |&index| &q.0[index]) + } +} + +impl Branch { + // invariant: q.0 is sorted + // + // invariant: qs.buf[0..qs.prefix_end] is a slice that encodes the set of INDICES in q.0 + // which the path to this branch has NOT queried. + // + // ie. for a given predicate {X=>true, Z=>true, Y=>false} + // => q is [(X,true), (Y,false), (Z,true)] + // => qs is initially [0,1,2] + // and if this branch queries 1, the subtree will receive qs as [0,2] + fn feed_and_run(&mut self, payload: &Payload, q: &Queryable, qs: &mut QueryableSubset) { + match &mut self.speculation { + Some(Speculation { on, t, f }) => { + if let Some((index, assignment)) = q.query(*on) { + // if assignment + } else { + } + todo!() + } + None => { + // + todo!() + } + } + } +} +impl Index for Tree { + type Output = Branch; + fn index(&self, k: BranchKey) -> &Self::Output { + &self.branches[(k.index_plus_one.get() - 1) as usize] + } +} +impl IndexMut for Tree { + fn index_mut(&mut self, k: BranchKey) -> &mut Self::Output { + &mut self.branches[(k.index_plus_one.get() - 1) as usize] + } +} +impl Index for Tree { + type Output = ProtocolS; + fn index(&self, k: StateKey) -> &Self::Output { + &self.states[(k.index_plus_one.get() - 1) as usize] + } +} +impl IndexMut for Tree { + fn index_mut(&mut self, k: StateKey) -> &mut Self::Output { + &mut self.states[(k.index_plus_one.get() - 1) as usize] + } +} + +struct BranchKey { + index_plus_one: NonZeroU32, +} +struct StateKey { + index_plus_one: NonZeroU32, +} + +struct Bitset { + bits: Vec, +} + +struct Polyp { + inbox: Vec, + inbox_masks: BitMasks, + states: Vec, + states_masks: BitMasks, +} + +// invariant: last element is not zero. +// => all values out of bounds are implicitly absent +#[derive(Debug, Default)] +struct BitSet(Vec); + +#[derive(Debug, Default)] +struct BitMasks(HashMap<(ChannelId, bool), BitSet>); + +struct BitSetAndIter<'a> { + // this value is immutable + // invariant: !sets.is_empty() + sets: &'a [&'a [u32]], + next_u32_index: usize, // invariant: in 0..32 while iterating + next_bit_index: usize, + cached: Option, // None <=> iterator is done +} +impl<'a> BitSetAndIter<'a> { + fn new(sets: &'a [&'a [u32]]) -> Self { + const EMPTY_SINGLETON: &[&[u32]] = &[&[]]; + let sets = if sets.is_empty() { EMPTY_SINGLETON } else { sets }; + Self { sets, next_u32_index: 0, next_bit_index: 0, cached: Self::nth_u32(sets, 0) } + } + fn nth_u32(sets: &'a [&'a [u32]], index: usize) -> Option { + sets.iter().fold(Some(!0), |a, b| { + let b = b.get(index)?; + Some(a? & b) + }) + } + fn next_chunk(&mut self) { + self.next_bit_index = 0; + self.next_u32_index += 1; + self.cached = Self::nth_u32(self.sets, self.next_u32_index); + } +} +impl Iterator for BitSetAndIter<'_> { + type Item = usize; + fn next(&mut self) -> Option { + loop { + // get cached chunk. If none exists, iterator is done. + let mut chunk = self.cached?; + if chunk == 0 { + self.next_chunk(); + continue; + } + // this chunk encodes 1+ Items to yield + // shift the contents of chunk until the least significant bit is 1 + + #[inline(always)] + fn shifty(chunk: &mut u32, shift_by: usize, next_bit_index: &mut usize) { + if *chunk & ((1 << shift_by) - 1) == 0 { + *next_bit_index += shift_by; + *chunk >>= shift_by; + } + } + shifty(&mut chunk, 16, &mut self.next_bit_index); + shifty(&mut chunk, 08, &mut self.next_bit_index); + shifty(&mut chunk, 04, &mut self.next_bit_index); + shifty(&mut chunk, 02, &mut self.next_bit_index); + shifty(&mut chunk, 01, &mut self.next_bit_index); + // assert(chunk & 1 == 1) + let index = self.next_u32_index * 32 + self.next_bit_index; + self.next_bit_index += 1; + self.cached = Some(chunk >> 1); + if chunk > 0 { + // assert(self.next_bit_index <= 32) + // because index was calculated with self.next_bit_index - 1 + return Some(index); + } + } + } +} + +#[test] +fn test_bit_iter() { + static SETS: &[&[u32]] = &[ + // + &[0b100011000000100101101], + &[0b100001000000000110100], + &[0b100001000100010100110], + ]; + let indices = BitSetAndIter::new(SETS).collect::>(); + println!("indices {:?}", indices); +} diff --git a/src/runtime/predicate.rs b/src/runtime/predicate.rs index 00920ec84a49458360252c9707558e4139609a57..c3c1299a25a78fe1044543127a8a705f17949bdc 100644 --- a/src/runtime/predicate.rs +++ b/src/runtime/predicate.rs @@ -1,6 +1,7 @@ -use crate::common::ChannelId; -use crate::common::ChannelIndex; -use crate::common::ControllerId; +use crate::common::*; +use crate::runtime::ProtocolS; +use core::ops::Index; +use core::ops::IndexMut; use std::collections::BTreeMap; @@ -235,3 +236,5 @@ impl Predicate { todo!() } } + +//////////////////////////// diff --git a/src/test/connector.rs b/src/test/connector.rs index f54cb2aca442d187b0c3da8d0cc7f375fbfde1fd..4e191050d0e0a0f17e89a01308f4d22994df31b7 100644 --- a/src/test/connector.rs +++ b/src/test/connector.rs @@ -593,9 +593,9 @@ fn routing_filter() { /* Sender -->filter-->P|A-->sync--> Receiver */ - let timeout = Duration::from_millis(1_500); + let timeout = Duration::from_millis(3_000); let addrs = [next_addr()]; - const N: usize = 10; + const N: usize = 1; assert!(run_connector_set(&[ // &|x| {