From af5facdd41b191837102d2e98ff4d4a3ff51e47f 2020-03-06 09:06:49 From: Christopher Esterhuyse Date: 2020-03-06 09:06:49 Subject: [PATCH] generalized announcement to Decision enum, which also covers TIMEOUT case --- diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 6991a81de7c6923037d2b8f02bba06c84ee58cca..6df77b1bb4258c18b5b0bde62ac79e5d09c250ff 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -2,40 +2,21 @@ use crate::common::*; use crate::runtime::{actors::*, endpoint::*, errors::*, *}; impl Controller { - //usable BETWEEN rounds - fn consistent(&self) -> bool { - self.inner.mono_n.is_some() - } - - fn end_round_with_decision(&mut self, decision: Predicate) -> Result<(), SyncErr> { + fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> { log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); - // let mut table_row = HashMap::::default(); - - let n_pair = { - let poly_n = self.ephemeral.poly_n.take().unwrap(); - let mono_n = poly_n.choose_mono(&decision).unwrap(); - (mono_n, poly_n) - }; - self.inner.mono_n = Some(n_pair.0.clone()); - - let p_pairs = self - .ephemeral - .poly_ps - .drain(..) - .map(|poly_p| { - let mono_p = poly_p.choose_mono(&decision).unwrap(); - (mono_p, poly_p) - }) - .collect::>() - .into_boxed_slice(); - self.inner.mono_ps.extend(p_pairs.iter().map(|p_pair| p_pair.0.clone())); - self.round_histories.push(RoundHistory::Consistent { - decision: decision.clone(), - native_component: n_pair, - protocol_components: p_pairs, - }); - let announcement = - CommMsgContents::Announce { oracle: decision }.into_msg(self.inner.round_index); + if let Decision::Success(predicate) = &decision { + // overwrite MonoN/P + self.inner.mono_n = + self.ephemeral.poly_n.take().unwrap().choose_mono(predicate).unwrap(); + self.inner.mono_ps.clear(); + self.inner.mono_ps.extend( + self.ephemeral + .poly_ps + .drain(..) + .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), + ) + } + let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); for &child_ekey in self.inner.family.children_ekeys.iter() { log!( &mut self.inner.logger, @@ -71,8 +52,9 @@ impl Controller { } else { // I have no parent -> I'm the leader assert!(self.inner.family.parent_ekey.is_none()); - let maybe_decision = self.ephemeral.solution_storage.iter_new_local_make_old().next(); - Ok(if let Some(decision) = maybe_decision { + let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); + Ok(if let Some(predicate) = maybe_predicate { + let decision = Decision::Success(predicate); log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); self.end_round_with_decision(decision)?; true @@ -86,7 +68,7 @@ impl Controller { &mut self, sync_batches: impl Iterator, ) -> Result { - let MonoN { ekeys, .. } = self.inner.mono_n.take().unwrap(); + let MonoN { ekeys, .. } = self.inner.mono_n.clone(); let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; let mut branches = HashMap::<_, _>::default(); for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { @@ -143,70 +125,30 @@ impl Controller { Ok(PolyN { ekeys, branches }) } - pub fn sync_round( - &mut self, - deadline: Instant, - sync_batches: Option>, - ) -> Result<(), SyncErr> { - if self.consistent() { - match self.sync_round_inner(deadline, sync_batches) { - Ok(()) => return Ok(()), - Err(error) => { - log!( - &mut self.inner.logger, - "/\\/\\/\\/\\/\\/ Sync round failed! Preparing for diagnosis...", - ); - let h = RoundHistory::Inconsistent { - error, - subtree_solutions: std::mem::take(&mut self.ephemeral.solution_storage), - native_component: self.ephemeral.poly_n.take().unwrap(), - protocol_components: std::mem::take(&mut self.ephemeral.poly_ps) - .into_boxed_slice(), - }; - self.round_histories.push(h); - for (round_index, round) in self.round_histories.iter().enumerate() { - log!(&mut self.inner.logger, "round {}:{:#?}\n", round_index, round); - } - } - } - } - if let Some(RoundHistory::Inconsistent { error, .. }) = - self.round_histories.iter().rev().next() - { - Err(error.clone()) - } else { - unreachable!() - } - } - // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. - fn sync_round_inner( + pub fn sync_round( &mut self, deadline: Instant, sync_batches: Option>, ) -> Result<(), SyncErr> { - assert!(self.ephemeral.is_clear()); - log!( &mut self.inner.logger, "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", self.inner.round_index ); + assert!(self.ephemeral.is_clear()); // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). // Some actors are dropped. some new actors are created. // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors - log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.inner.mono_ps.len()); - self.ephemeral.poly_ps.clear(); - // let mut poly_ps: Vec = vec![]; - while let Some(mut mono_p) = self.inner.mono_ps.pop() { + self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); + log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); + while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { let mut m_ctx = MonoPContext { ekeys: &mut mono_p.ekeys, + mono_ps: &mut self.ephemeral.mono_ps, inner: &mut self.inner, - // endpoint_exts: &mut self.endpoint_exts, - // mono_ps: &mut self.mono_ps, - // channel_id_stream: &mut self.channel_id_stream, }; // cross boundary into crate::protocol let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); @@ -229,7 +171,7 @@ impl Controller { // TODO: store and update this mapping rather than rebuilding it each round. let ekey_to_holder: HashMap = { use PolyId::*; - let n = self.inner.mono_n.iter().flat_map(|m| m.ekeys.iter().map(move |&e| (e, N))); + let n = self.inner.mono_n.ekeys.iter().map(move |&e| (e, N)); let p = self .ephemeral .poly_ps @@ -247,7 +189,7 @@ impl Controller { // 4. Create the solution storage. it tracks the solutions of "subtrees" // of the controller in the overlay tree. self.ephemeral.solution_storage.reset({ - let n = self.inner.mono_n.iter().map(|_| SubtreeId::PolyN); + let n = std::iter::once(SubtreeId::PolyN); let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); let c = self .inner @@ -267,7 +209,6 @@ impl Controller { // 5. kick off the synchronous round of the native actor if it exists log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); - assert_eq!(sync_batches.is_some(), self.inner.mono_n.is_some()); // TODO better err self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { // using if let because of nested ? operator // TODO check that there are 1+ branches or NO SOLUTION @@ -361,6 +302,27 @@ impl Controller { } }; match current_content { + CommMsgContents::Failure => match self.inner.family.parent_ekey { + Some(parent_ekey) => { + let announcement = Msg::CommMsg(CommMsg { + round_index: self.inner.round_index, + contents: CommMsgContents::Failure, + }); + log!( + &mut self.inner.logger, + "Forwarding {:?} to parent with ekey {:?}", + &announcement, + parent_ekey + ); + self.inner + .endpoint_exts + .get_mut(parent_ekey) + .expect("ss") + .endpoint + .send(announcement.clone())?; + } + None => return self.end_round_with_decision(Decision::Failure), + }, CommMsgContents::Elaborate { partial_oracle } => { // Child controller submitted a subtree solution. if !self.inner.family.children_ekeys.contains(&received.recipient) { @@ -382,7 +344,7 @@ impl Controller { return Ok(()); } } - CommMsgContents::Announce { oracle } => { + CommMsgContents::Announce { decision } => { if self.inner.family.parent_ekey != Some(received.recipient) { return Err(SyncErr::AnnounceFromNonParent); } @@ -390,9 +352,9 @@ impl Controller { &mut self.inner.logger, "Received ANNOUNCEMENT from from parent {:?}: {:?}", received.recipient, - &oracle + &decision ); - return self.end_round_with_decision(oracle); + return self.end_round_with_decision(decision); } CommMsgContents::SendPayload { payload_predicate, payload } => { assert_eq!( @@ -489,6 +451,25 @@ impl Controller { } } } + // 'timeout_loop: loop { + // log!(&mut self.inner.logger, "`POLLING (already timed out)`..."); + // let received = self.recv_blocking()?; + // log!(&mut self.inner.logger, "::: message {:?}...", &received); + // let current_content = match received.msg { + // Msg::SetupMsg(_) => { + // // This occurs in the event the connector was malformed during connect() + // return Err(SyncErr::UnexpectedSetupMsg); + // } + // Msg::CommMsg(CommMsg { round_index, contents }) => { + // if round_index > self.inner.round_index { + // self.delay(received); + // continue 'timeout_loop; + // } else { + // contents + // } + // } + // }; + // } } } impl ControllerEphemeral { @@ -496,6 +477,7 @@ impl ControllerEphemeral { self.solution_storage.is_clear() && self.poly_n.is_none() && self.poly_ps.is_empty() + && self.mono_ps.is_empty() && self.ekey_to_holder.is_empty() } fn clear(&mut self) { @@ -540,7 +522,7 @@ impl MonoContext for MonoPContext<'_> { ); if moved_ekeys.is_subset(self.ekeys) { self.ekeys.retain(|x| !moved_ekeys.contains(x)); - self.inner.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys }); + self.mono_ps.push(MonoP { state: init_state, ekeys: moved_ekeys }); } else { panic!("MachineP attempting to move alien ekey!"); } @@ -703,3 +685,9 @@ impl PolyContext for BranchPContext<'_, '_> { val } } + +/* +invariant: Controller.inner has stable MonoN/P states for which it will start the + + +*/ diff --git a/src/runtime/connector.rs b/src/runtime/connector.rs index 3c892b5852b23354adfc89b5249ce03ae638ec21..e56113e1298926048fe66fd8158b186478b905e0 100644 --- a/src/runtime/connector.rs +++ b/src/runtime/connector.rs @@ -161,10 +161,7 @@ impl Connector { // do the synchronous round! connected.controller.sync_round(deadline, Some(connected.sync_batches.drain(..)))?; connected.sync_batches.push(SyncBatch::default()); - - let mono_n = connected.controller.inner.mono_n.as_mut().unwrap(); - let result = mono_n.result.as_mut().unwrap(); - Ok(result.0) + Ok(connected.controller.inner.mono_n.result.as_mut().unwrap().0) } pub fn read_gotten(&self, native_port_index: usize) -> Result<&[u8], ReadGottenErr> { @@ -178,8 +175,7 @@ impl Connector { if polarity != Getter { return Err(WrongPolarity); } - let mono_n = connected.controller.inner.mono_n.as_ref().expect("controller has no mono_n?"); - let result = mono_n.result.as_ref().ok_or(NoPreviousRound)?; + let result = connected.controller.inner.mono_n.result.as_ref().ok_or(NoPreviousRound)?; let payload = result.1.get(&key).ok_or(DidNotGet)?; Ok(payload) } diff --git a/src/runtime/endpoint.rs b/src/runtime/endpoint.rs index e95ca3737b1052d4557ad0dc4613c69ace0b9b6c..5bf6f48d9ee1cdcf027d9332e1c9cc8df4cba735 100644 --- a/src/runtime/endpoint.rs +++ b/src/runtime/endpoint.rs @@ -18,6 +18,12 @@ pub struct EndpointInfo { pub channel_id: ChannelId, } +#[derive(Debug, Clone)] +pub(crate) enum Decision { + Failure, + Success(Predicate), +} + #[derive(Clone, Debug)] pub(crate) enum Msg { SetupMsg(SetupMsg), @@ -45,8 +51,9 @@ pub(crate) struct CommMsg { #[derive(Clone, Debug)] pub(crate) enum CommMsgContents { SendPayload { payload_predicate: Predicate, payload: Payload }, - Elaborate { partial_oracle: Predicate }, - Announce { oracle: Predicate }, + Elaborate { partial_oracle: Predicate }, // SINKWARD + Failure, // SINKWARD + Announce { decision: Decision }, // SINKAWAYS } pub struct NetworkEndpoint { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 8e24363ee27705cf72012d3e3264efbd972e16e5..4bf276b09662c2af5fcc5e71155123b3a76ff550 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -123,8 +123,8 @@ struct ControllerInner { channel_id_stream: ChannelIdStream, endpoint_exts: Arena, messenger_state: MessengerState, - mono_n: Option, - mono_ps: Vec, + mono_n: MonoN, // state at next round start + mono_ps: Vec, // state at next round start family: ControllerFamily, logger: String, } @@ -135,6 +135,7 @@ struct ControllerEphemeral { solution_storage: SolutionStorage, poly_n: Option, poly_ps: Vec, + mono_ps: Vec, ekey_to_holder: HashMap, } @@ -168,6 +169,7 @@ pub(crate) enum SubtreeId { pub(crate) struct MonoPContext<'a> { inner: &'a mut ControllerInner, ekeys: &'a mut HashSet, + mono_ps: &'a mut Vec, } pub(crate) struct PolyPContext<'a> { my_subtree_id: SubtreeId, @@ -246,15 +248,10 @@ trait Messengerlike { } } } - - // attempt to receive a message from one of the endpoints before the deadline - fn recv_until( - &mut self, - deadline: Option, - ) -> Result, MessengerRecvErr> { + fn recv_blocking(&mut self) -> Result { // try get something buffered if let Some(x) = self.get_state_mut().undelayed.pop() { - return Ok(Some(x)); + return Ok(x); } loop { @@ -267,19 +264,18 @@ trait Messengerlike { { // this endpoint MAY still have messages! check again in future self.get_state_mut().polled_undrained.insert(eekey); - return Ok(Some(ReceivedMsg { recipient: eekey, msg })); + return Ok(ReceivedMsg { recipient: eekey, msg }); } } let state = self.get_state_mut(); - match state.poll_events_until(deadline) { - Ok(()) => { - for e in state.events.iter() { - state.polled_undrained.insert(Key::from_token(e.token())); - } - } - Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), - Err(PollDeadlineErr::Timeout) => return Ok(None), + + state + .poll + .poll(&mut state.events, None) + .map_err(|_| MessengerRecvErr::PollingFailed)?; + for e in state.events.iter() { + state.polled_undrained.insert(Key::from_token(e.token())); } } } @@ -357,15 +353,6 @@ impl ChannelIdStream { } impl MessengerState { - fn with_event_capacity(event_capacity: usize) -> Result { - Ok(Self { - poll: Poll::new()?, - events: Events::with_capacity(event_capacity), - delayed: Default::default(), - undelayed: Default::default(), - polled_undrained: Default::default(), - }) - } // does NOT guarantee that events is non-empty fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { use PollDeadlineErr::*; @@ -374,17 +361,6 @@ impl MessengerState { self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; Ok(()) } - fn poll_events_until(&mut self, deadline: Option) -> Result<(), PollDeadlineErr> { - use PollDeadlineErr::*; - self.events.clear(); - let poll_timeout = if let Some(d) = deadline { - Some(d.checked_duration_since(Instant::now()).ok_or(Timeout)?) - } else { - None - }; - self.poll.poll(&mut self.events, poll_timeout).map_err(|_| PollingFailed)?; - Ok(()) - } } impl From for ConnectErr { fn from(e: PollDeadlineErr) -> ConnectErr { diff --git a/src/runtime/serde.rs b/src/runtime/serde.rs index d44f535bed337d08a847d52a5bcaefd104872ffc..589c5f0f5c639310fd0b087e78403e1f50249a54 100644 --- a/src/runtime/serde.rs +++ b/src/runtime/serde.rs @@ -1,6 +1,6 @@ use crate::common::*; use crate::runtime::{ - endpoint::{CommMsg, CommMsgContents, EndpointInfo, Msg, SetupMsg}, + endpoint::{CommMsg, CommMsgContents, Decision, EndpointInfo, Msg, SetupMsg}, Predicate, }; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; @@ -180,6 +180,27 @@ impl De for R { Ok(Predicate { assigned }) } } +impl Ser for W { + fn ser(&mut self, t: &Decision) -> Result<(), std::io::Error> { + match t { + Decision::Failure => self.ser(&b'F'), + Decision::Success(predicate) => { + self.ser(&b'S')?; + self.ser(predicate) + } + } + } +} +impl De for R { + fn de(&mut self) -> Result { + let b: u8 = self.de()?; + Ok(match b { + b'F' => Decision::Failure, + b'S' => Decision::Success(self.de()?), + _ => return Err(InvalidData.into()), + }) + } +} impl Ser for W { fn ser(&mut self, t: &Polarity) -> Result<(), std::io::Error> { @@ -229,7 +250,8 @@ impl Ser for W { ser_seq![self, &4u8, zig, payload_predicate, payload] } Elaborate { partial_oracle } => ser_seq![self, &5u8, zig, partial_oracle], - Announce { oracle } => ser_seq![self, &6u8, zig, oracle], + Announce { decision } => ser_seq![self, &6u8, zig, decision], + Failure => ser_seq![self, &7u8], } } } @@ -252,7 +274,8 @@ impl De for R { let contents = match b { 4 => SendPayload { payload_predicate: self.de()?, payload: self.de()? }, 5 => Elaborate { partial_oracle: self.de()? }, - 6 => Announce { oracle: self.de()? }, + 6 => Announce { decision: self.de()? }, + 7 => Failure, _ => return Err(InvalidData.into()), }; Msg::CommMsg(CommMsg { round_index: zig as usize, contents }) @@ -260,49 +283,3 @@ impl De for R { }) } } - -///////////////// - -// #[test] -// fn my_serde() -> Result<(), std::io::Error> { -// let payload_predicate = Predicate { -// assigned: maplit::btreemap! { ChannelId {controller_id: 3, channel_index: 9} => false }, -// }; -// let msg = Msg::CommMsg(CommMsg { -// round_index: !0, -// contents: CommMsgContents::SendPayload { -// payload_predicate, -// payload: (0..).take(2).collect(), -// }, -// }); -// let mut v = vec![]; -// v.ser(&msg)?; -// print!("["); -// for (i, &x) in v.iter().enumerate() { -// print!("{:02x}", x); -// if i % 4 == 3 { -// print!(" "); -// } -// } -// println!("]"); - -// let msg2: Msg = (&v[..]).de()?; -// println!("msg2 {:#?}", msg2); -// Ok(()) -// } - -// #[test] -// fn varint() { -// let mut v = vec![]; -// v.ser(&ZigZag(!0)).unwrap(); -// for (i, x) in v.iter_mut().enumerate() { -// print!("{:02x}", x); -// if i % 4 == 3 { -// print!(" "); -// } -// } -// *v.iter_mut().last().unwrap() |= 3; - -// let ZigZag(x) = De::de(&mut &v[..]).unwrap(); -// println!(""); -// } diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 76092a343447c3472b145338036282ce5b169f48..1d0516e743654f6d1e34fad887f10284e989a62f 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -100,7 +100,7 @@ impl Controller { let (mut messenger_state, mut endpoint_exts) = Self::finish_endpoint_ext_todos(major, &mut logger, endpoint_ext_todos, deadline)?; - let n_mono = Some(MonoN { ekeys: ekeys_native.into_iter().collect(), result: None }); + let n_mono = MonoN { ekeys: ekeys_native.into_iter().collect(), result: None }; let p_monos = vec![MonoP { state: protocol_description.new_main_component(main_component, &ekeys_proto), ekeys: ekeys_proto.into_iter().collect(),