From 0a71d0af9edf8d718f22f6944748a311c1ae8b66 2020-03-03 15:04:11 From: Christopher Esterhuyse Date: 2020-03-03 15:04:11 Subject: [PATCH] histories --- diff --git a/src/common.rs b/src/common.rs index 8a90df240d15bf2ff155733cd9228de1738694c2..91e4d451c504b9112ebaaa1bb1180b4eecb37ab2 100644 --- a/src/common.rs +++ b/src/common.rs @@ -41,7 +41,7 @@ pub enum Polarity { Getter, // input port (from the perspective of the component) } -#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone, Debug)] +#[derive(Eq, PartialEq, Ord, PartialOrd, Hash, Copy, Clone)] #[repr(C)] pub struct Port(pub usize); // ports are COPY pub type Key = Port; @@ -111,6 +111,11 @@ pub trait PolyContext { } ///////////////////// IMPL ///////////////////// +impl Debug for Port { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "Port({})", self.0) + } +} impl Key { pub fn from_raw(raw: usize) -> Self { Self(raw) diff --git a/src/protocol/eval.rs b/src/protocol/eval.rs index edf19cb1bb493fb8b200685d2b84608eed005728..0d93e716d10691dc11f2034e921ffe60ed518657 100644 --- a/src/protocol/eval.rs +++ b/src/protocol/eval.rs @@ -1457,7 +1457,7 @@ impl Store { let var = var.declaration.unwrap(); subject = self.map.get(&var).unwrap(); } - _ => unreachable!(), + q => unreachable!("Reached {:?}", q), } match subject.get(&index) { Some(value) => Ok(value), @@ -1472,7 +1472,7 @@ impl Store { let var = var.declaration.unwrap(); subject = self.map.get(&var).unwrap(); } - _ => unreachable!(), + q => unreachable!("Reached {:?}", q), } match subject.length() { Some(value) => Ok(value), diff --git a/src/runtime/actors.rs b/src/runtime/actors.rs index c9ea8944f8b1ae6e53be625f437cc000cd5a16b2..bb402ce1bf5db6f999db1e2072a20671ea22097d 100644 --- a/src/runtime/actors.rs +++ b/src/runtime/actors.rs @@ -1,7 +1,7 @@ use crate::common::*; use crate::runtime::{endpoint::*, *}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct MonoN { pub ekeys: HashSet, pub result: Option<(usize, HashMap)>, @@ -18,7 +18,7 @@ pub(crate) struct BranchN { pub sync_batch_index: usize, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MonoP { pub state: ProtocolS, pub ekeys: HashSet, @@ -31,6 +31,7 @@ pub(crate) struct PolyP { } #[derive(Debug, Clone)] pub(crate) struct BranchP { + pub blocking_on: Option, pub outbox: HashMap, pub inbox: HashMap, pub state: ProtocolS, @@ -89,6 +90,7 @@ impl PolyP { // don't rerun now. Rerun at next `sync_run` log!(&mut m_ctx.inner.logger, "~ ... Delay {:?}", m_ctx.my_subtree_id,); + branch.blocking_on = Some(ekey); self.incomplete.insert(predicate, branch); } else { log!(&mut m_ctx.inner.logger, "~ ... Drop {:?}", m_ctx.my_subtree_id,); @@ -214,12 +216,16 @@ impl PolyP { &payload_predicate ); branch.inbox.insert(ekey, payload); - vec![(payload_predicate, branch)] + if branch.blocking_on == Some(ekey) { + branch.blocking_on = None; + vec![(payload_predicate, branch)] + } else { + vec![] + } } else { log!( &mut m_ctx.inner.logger, "... poly_recv_run didn't have any exact matches... Let's try feed it to all branches", - ); let mut incomplete2 = HashMap::<_, _>::default(); let to_run = self @@ -241,7 +247,15 @@ impl PolyP { assert_eq!(prev_payload, &payload); } branch.inbox.insert(ekey, payload.clone()); - Some((old_predicate, branch)) + if branch.blocking_on == Some(ekey) { + // run. + branch.blocking_on = None; + Some((old_predicate, branch)) + } else { + // don't bother running. its awaiting something else + incomplete2.insert(old_predicate, branch); + None + } } Csr::New(new) => { log!( @@ -261,7 +275,15 @@ impl PolyP { // put the original back untouched incomplete2.insert(old_predicate, branch); - Some((new, payload_branch)) + if payload_branch.blocking_on == Some(ekey) { + // run the fork + payload_branch.blocking_on = None; + Some((new, payload_branch)) + } else { + // don't bother running. its awaiting something else + incomplete2.insert(new, payload_branch); + None + } } Csr::LatterNotFormer => { log!( @@ -279,8 +301,16 @@ impl PolyP { payload_branch.inbox.insert(ekey, payload.clone()); // put the original back untouched - incomplete2.insert(old_predicate, branch); - Some((payload_predicate.clone(), payload_branch)) + incomplete2.insert(old_predicate.clone(), branch); + if payload_branch.blocking_on == Some(ekey) { + // run the fork + payload_branch.blocking_on = None; + Some((old_predicate, payload_branch)) + } else { + // don't bother running. its awaiting something else + incomplete2.insert(old_predicate, payload_branch); + None + } } Csr::Nonexistant => { log!( @@ -307,21 +337,11 @@ impl PolyP { self.poly_run_these_branches(m_ctx, protocol_description, to_run) } - pub(crate) fn become_mono( - mut self, - decision: &Predicate, - table_row: &mut HashMap, - ) -> MonoP { - if let Some((_, branch)) = self.complete.drain().find(|(p, _)| decision.satisfies(p)) { - let BranchP { inbox, state, outbox } = branch; - for (key, payload) in inbox.into_iter().chain(outbox.into_iter()) { - table_row.insert(key, payload); - } - self.incomplete.clear(); - MonoP { state, ekeys: self.ekeys } - } else { - panic!("No such solution!") - } + pub(crate) fn choose_mono(&self, decision: &Predicate) -> Option { + self.complete + .iter() + .find(|(p, _)| decision.satisfies(p)) + .map(|(_, branch)| MonoP { state: branch.state.clone(), ekeys: self.ekeys.clone() }) } } @@ -396,32 +416,13 @@ impl PolyN { std::mem::swap(&mut branches2, &mut self.branches); } - pub fn become_mono( - mut self, - logger: &mut String, - decision: &Predicate, - table_row: &mut HashMap, - ) -> MonoN { - log!( - logger, - "decision {:?} with branch preds {:?}", - decision, - self.branches.iter().collect::>() - ); - if let Some((branch_pred, branch)) = self - .branches - .drain() + pub fn choose_mono(&self, decision: &Predicate) -> Option { + self.branches + .iter() .find(|(p, branch)| branch.to_get.is_empty() && decision.satisfies(p)) - { - log!(logger, "decision {:?} mapped to branch {:?}", decision, branch_pred); - let BranchN { gotten, sync_batch_index, .. } = branch; - for (&key, payload) in gotten.iter() { - assert!(table_row.insert(key, payload.clone()).is_none()); - } - MonoN { ekeys: self.ekeys, result: Some((sync_batch_index, gotten)) } - } else { - log!(logger, "decision {:?} HAD NO SOLUTION!!?", decision); - panic!("No such solution!") - } + .map(|(_, branch)| { + let BranchN { gotten, sync_batch_index, .. } = branch.clone(); + MonoN { ekeys: self.ekeys.clone(), result: Some((sync_batch_index, gotten)) } + }) } } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index a9971f53b15c21af0e7fd62fcc3d2fa83f32a875..85ba76f4aa297db6778236b802b8b75234439a98 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -2,34 +2,33 @@ use crate::common::*; use crate::runtime::{actors::*, endpoint::*, errors::*, *}; impl Controller { + //usable BETWEEN rounds + fn consistent(&self) -> bool { + self.inner.mono_n.is_some() + } + fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> { log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); - let mut table_row = HashMap::::default(); - // 1. become_mono for Poly actors - self.inner.mono_n = - self.ephemeral.poly_n.take().map(|poly_n| { - poly_n.become_mono(&mut self.inner.logger, &decision, &mut table_row) - }); - self.inner.mono_ps.extend( - self.ephemeral.poly_ps.drain(..).map(|m| m.become_mono(&decision, &mut table_row)), - ); + // let mut table_row = HashMap::::default(); - // convert (Key=>Payload) map to (ChannelId=>Payload) map. - let table_row: HashMap<_, _> = table_row - .into_iter() - .map(|(ekey, msg)| { - let channel_id = self.inner.endpoint_exts.get(ekey).unwrap().info.channel_id; - (channel_id, msg) + let n_pair = { + let poly_n = self.ephemeral.poly_n.take().unwrap(); + let mono_n = poly_n.choose_mono(&decision).unwrap(); + (mono_n, poly_n) + }; + self.inner.mono_n = Some(n_pair.0.clone()); + + let p_pairs = self + .ephemeral + .poly_ps + .drain(..) + .map(|poly_p| { + let mono_p = poly_p.choose_mono(&decision).unwrap(); + (mono_p, poly_p) }) - .collect(); - // log all firing ports - for (channel_id, payload) in table_row { - log!(&mut self.inner.logger, "VALUE {:?} => Message({:?})", channel_id, payload); - } - // log all silent ports - for channel_id in decision.iter_matching(false) { - log!(&mut self.inner.logger, "VALUE {:?} => *", channel_id); - } + .collect::>(); + self.inner.mono_ps.extend(p_pairs.iter().map(|p_pair| p_pair.0.clone())); + self.round_histories.push(RoundHistory::Consistent(decision.clone(), n_pair, p_pairs)); let announcement = CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index); for &child_ekey in self.inner.family.children_ekeys.iter() { @@ -139,15 +138,44 @@ impl Controller { Ok(PolyN { ekeys, branches }) } - // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. - // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. pub fn sync_round( &mut self, deadline: Instant, sync_batches: Option>, ) -> Result<(), SyncErr> { - // TODO! fuse handle_locals_return_decision and end_round_return_decision + if !self.consistent() { + // was previously inconsistent + return Err(SyncErr::Inconsistent); + } + match self.sync_round_inner(deadline, sync_batches) { + Ok(()) => Ok(()), + Err(e) => { + log!( + &mut self.inner.logger, + "/\\/\\/\\/\\/\\/ Sync round failed! Preparing for diagnosis...", + ); + let h = RoundHistory::Inconsistent( + std::mem::take(&mut self.ephemeral.solution_storage), + self.ephemeral.poly_n.take().unwrap(), + std::mem::take(&mut self.ephemeral.poly_ps), + ); + self.round_histories.push(h); + for (round_index, round) in self.round_histories.iter().enumerate() { + log!(&mut self.inner.logger, "round {}:{:#?}\n", round_index, round); + } + Err(e) + } + } + } + + // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. + // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. + fn sync_round_inner( + &mut self, + deadline: Instant, + sync_batches: Option>, + ) -> Result<(), SyncErr> { assert!(self.ephemeral.is_clear()); log!( @@ -287,11 +315,7 @@ impl Controller { 'recv_loop: loop { log!(&mut self.inner.logger, "`POLLING`..."); let received = self.recv(deadline)?.ok_or_else(|| { - log!( - &mut self.inner.logger, - ":( timing out. Solutions storage in state... {:#?}", - &self.ephemeral.solution_storage - ); + log!(&mut self.inner.logger, ":( timing out"); SyncErr::Timeout })?; log!(&mut self.inner.logger, "::: message {:?}...", &received); @@ -481,6 +505,7 @@ impl Into for MonoP { state: self.state, inbox: Default::default(), outbox: Default::default(), + blocking_on: None, } }, ekeys: self.ekeys, diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index eb8b9070a228c6849028a08cdbb06c0c6d2d5ddf..fc81a34aa55275f57d89ec6ca7b85c4a5838dc22 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -6,7 +6,7 @@ pub(crate) mod communication; pub(crate) mod connector; pub(crate) mod endpoint; pub mod errors; -pub mod experimental; +// pub mod experimental; mod serde; pub(crate) mod setup; @@ -95,11 +95,18 @@ struct ChannelIdStream { next_channel_index: ChannelIndex, } +#[derive(Debug)] +enum RoundHistory { + Consistent(Predicate, (MonoN, PolyN), Vec<(MonoP, PolyP)>), + Inconsistent(SolutionStorage, PolyN, Vec), +} + #[derive(Debug)] struct Controller { protocol_description: Arc, inner: ControllerInner, ephemeral: ControllerEphemeral, + round_histories: Vec, } #[derive(Debug)] struct ControllerInner { diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index e17a14087898ccb8efc6d03946a66a7428e30a6b..76092a343447c3472b145338036282ce5b169f48 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -127,7 +127,12 @@ impl Controller { round_index: 0, logger, }; - let controller = Self { protocol_description, inner, ephemeral: Default::default() }; + let controller = Self { + protocol_description, + inner, + ephemeral: Default::default(), + round_histories: vec![], + }; Ok((controller, native_interface)) } diff --git a/src/test/connector.rs b/src/test/connector.rs index c0454ba8b84f8a78b03090d030b3ab13c3f79f99..ac764a75602b30085b938d09b5eaa418579814f0 100644 --- a/src/test/connector.rs +++ b/src/test/connector.rs @@ -79,6 +79,27 @@ primitive fifo_1(msg m, in i, out o) { composite fifo_1_e(in i, out o) { new fifo_1(null, i, o); } +primitive samelen(in a, in b, out c) { + synchronous { + msg m = get(a); + msg n = get(b); + assert(m.length == n.length); + put(c, m); + } +} +primitive repl2(in a, out b, out c) { + synchronous { + msg m = get(a); + put(b, m); + put(c, m); + } +} +composite samelen_repl(in a, out b) { + channel c -> d; + channel e -> f; + new samelen(a, f, c); + new repl2(d, b, e); +} "; #[test] @@ -680,3 +701,77 @@ fn connector_fifo_1_e() { }, ])); } + +#[test] +#[should_panic] +fn connector_causal_loop() { + /* + /-->\ /-->P|A-->\ /-->\ + Alice exchange exchange Bob + \<--/ \<--P|A<--/ \<--/ + */ + let timeout = Duration::from_millis(1_500); + let addrs = [next_addr(), next_addr()]; + const N: usize = 1; + assert!(run_connector_set(&[ + // + &|x| { + // Alice + x.configure(PDL, b"exchange").unwrap(); + x.bind_port(0, Passive(addrs[0])).unwrap(); // peer out + x.bind_port(1, Passive(addrs[1])).unwrap(); // peer in + x.bind_port(2, Native).unwrap(); // native in + x.bind_port(3, Native).unwrap(); // native out + x.connect(timeout).unwrap(); + for _ in 0..N { + assert_eq!(Ok(()), x.put(0, b"A->B".to_vec())); + assert_eq!(Ok(()), x.get(1)); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(b"B->A" as &[u8]), x.read_gotten(1)); + } + }, + &|x| { + // Bob + x.configure(PDL, b"exchange").unwrap(); + x.bind_port(0, Active(addrs[1])).unwrap(); // peer out + x.bind_port(1, Active(addrs[0])).unwrap(); // peer in + x.bind_port(2, Native).unwrap(); // native in + x.bind_port(3, Native).unwrap(); // native out + x.connect(timeout).unwrap(); + for _ in 0..N { + assert_eq!(Ok(()), x.put(0, b"B->A".to_vec())); + assert_eq!(Ok(()), x.get(1)); + assert_eq!(Ok(0), x.sync(timeout)); + assert_eq!(Ok(b"A->B" as &[u8]), x.read_gotten(1)); + } + }, + ])); +} + +#[test] +#[should_panic] +fn connector_causal_loop2() { + /* + /-->\ /<---\ + Alice samelen-->repl + \<-------------/ + */ + let timeout = Duration::from_millis(1_500); + // let addrs = [next_addr(), next_addr()]; + const N: usize = 1; + assert!(run_connector_set(&[ + // + &|x| { + // Alice + x.configure(PDL, b"samelen_repl").unwrap(); + x.bind_port(0, Native).unwrap(); + x.bind_port(1, Native).unwrap(); + x.connect(timeout).unwrap(); + for _ in 0..N { + assert_eq!(Ok(()), x.put(0, b"foo".to_vec())); + assert_eq!(Ok(()), x.get(1)); + assert_eq!(Ok(0), x.sync(timeout)); + } + }, + ])); +}