diff --git a/src/common.rs b/src/common.rs index 9b96d11052a9f93b869515d8e3a79e6a83a8af15..698ac11837209e13af4a196cc69036d1c85e89e6 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,7 +1,7 @@ ///////////////////// PRELUDE ///////////////////// pub use crate::protocol::{ComponentState, ProtocolDescription}; -pub use crate::runtime::{NonsyncContext, SyncContext}; +pub use crate::runtime::{NonsyncProtoContext, SyncProtoContext}; pub use core::{ cmp::Ordering, @@ -140,3 +140,13 @@ impl Debug for PortId { write!(f, "PortId({},{})", self.controller_id, self.port_index) } } +impl std::ops::Not for Polarity { + type Output = Self; + fn not(self) -> Self::Output { + use Polarity::*; + match self { + Putter => Getter, + Getter => Putter, + } + } +} diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 9bbc5c2e0c4750de5971f78716eeeee48e251c52..4a901047cbc4513c18fb04d3f58a511a93ae694a 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -23,8 +23,8 @@ pub struct ComponentState { prompt: Prompt, } pub enum EvalContext<'a> { - Nonsync(&'a mut NonsyncContext<'a>), - Sync(&'a mut SyncContext<'a>), + Nonsync(&'a mut NonsyncProtoContext<'a>), + Sync(&'a mut SyncProtoContext<'a>), None, } ////////////////////////////////////////////// @@ -111,7 +111,7 @@ impl ProtocolDescription { impl ComponentState { pub fn nonsync_run<'a: 'b, 'b>( &'a mut self, - context: &'b mut NonsyncContext<'b>, + context: &'b mut NonsyncProtoContext<'b>, pd: &'a ProtocolDescription, ) -> NonsyncBlocker { let mut context = EvalContext::Nonsync(context); @@ -147,7 +147,7 @@ impl ComponentState { pub fn sync_run<'a: 'b, 'b>( &'a mut self, - context: &'b mut SyncContext<'b>, + context: &'b mut SyncProtoContext<'b>, pd: &'a ProtocolDescription, ) -> SyncBlocker { let mut context = EvalContext::Sync(context); diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index e619b0712bbc97002d4b7cb762fd5e264fafdb49..4ddefc9f54cef6af8c49f37b6db0e5cd6cf5eb4c 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -1,739 +1,736 @@ +use super::*; use crate::common::*; -use crate::runtime::{actors::*, endpoint::*, errors::*, *}; - -impl Controller { - fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> { - log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); - let ret = match &decision { - Decision::Success(predicate) => { - // overwrite MonoN/P - self.inner.mono_n = { - let poly_n = self.ephemeral.poly_n.take().unwrap(); - poly_n.choose_mono(predicate).unwrap_or_else(|| { - panic!( - "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", - &predicate, &poly_n.branches, &self.inner.logger - ); - }) - }; - self.inner.mono_ps.clear(); - self.inner.mono_ps.extend( - self.ephemeral - .poly_ps - .drain(..) - .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), - ); - Ok(()) - } - Decision::Failure => Err(SyncErr::Timeout), - }; - let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); - for &child_port in self.inner.family.children_ports.iter() { - log!( - &mut self.inner.logger, - "Forwarding {:?} to child with port {:?}", - &announcement, - child_port - ); - self.inner - .endpoint_exts - .get_mut(child_port) - .expect("eefef") - .endpoint - .send(announcement.clone())?; - } - self.inner.round_index += 1; - self.ephemeral.clear(); - ret +impl NonsyncProtoContext<'_> { + pub fn new_component(&mut self, moved_ports: HashSet, init_state: ComponentState) { + let component = &mut self.connector.proto_components[self.proto_component_index]; + assert!(component.ports.is_subset(&moved_ports)); + // let polarities = self.proto_description.component_polarities(identifier).expect("BAD "); + // if polarities.len() != ports.len() { + // return Err(WrongNumberOfParamaters { expected: polarities.len() }); + // } + // for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { + // if !self.native_ports.contains(port) { + // return Err(UnknownPort(*port)); + // } + // if expected_polarity != self.check_polarity(port) { + // return Err(WrongPortPolarity { port: *port, expected_polarity }); + // } + // } + // // ok! + // let state = self.proto_description.new_main_component(identifier, ports); + // let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; + // let proto_component_index = self.proto_components.len(); + // self.proto_components.push(proto_component); + // for port in ports.iter() { + // if let Polarity::Getter = self.check_polarity(port) { + // self.inp_to_route.insert( + // *port, + // InpRoute::LocalComponent(LocalComponentId::Proto { + // index: proto_component_index, + // }), + // ); + // } + // } } - - // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found - fn handle_locals_maybe_decide(&mut self) -> Result { - if let Some(parent_port) = self.inner.family.parent_port { - // I have a parent -> I'm not the leader - let parent_endpoint = - &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; - for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { - let msg = - CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); - log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); - parent_endpoint.send(msg)?; - } - Ok(false) - } else { - // I have no parent -> I'm the leader - assert!(self.inner.family.parent_port.is_none()); - let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); - Ok(if let Some(predicate) = maybe_predicate { - let decision = Decision::Success(predicate); - log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); - self.end_round_with_decision(decision)?; - true - } else { - false - }) - } + pub fn new_channel(&mut self) -> [PortId; 2] { + self.connector.add_port_pair() + } +} +impl SyncProtoContext<'_> { + pub fn is_firing(&mut self, port: PortId) -> Option { + todo!() + } + pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + todo!() } +} - fn kick_off_native( - &mut self, - sync_batches: impl Iterator, - ) -> Result { - let MonoN { ports, .. } = self.inner.mono_n.clone(); - let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; - let mut branches = HashMap::<_, _>::default(); - for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { - let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; - let all_ports = ports.iter().copied(); - let all_channel_ids = all_ports.map(port_to_channel_id); +// impl Connector { +// fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncError> { +// log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); +// let ret = match &decision { +// Decision::Success(predicate) => { +// // overwrite MonoN/P +// self.inner.mono_n = { +// let poly_n = self.ephemeral.poly_n.take().unwrap(); +// poly_n.choose_mono(predicate).unwrap_or_else(|| { +// panic!( +// "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", +// &predicate, &poly_n.branches, &self.inner.logger +// ); +// }) +// }; +// self.inner.mono_ps.clear(); +// self.inner.mono_ps.extend( +// self.ephemeral +// .poly_ps +// .drain(..) +// .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), +// ); +// Ok(()) +// } +// Decision::Failure => Err(SyncError::Timeout), +// }; +// let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); +// for &child_port in self.inner.family.children_ports.iter() { +// log!( +// &mut self.inner.logger, +// "Forwarding {:?} to child with port {:?}", +// &announcement, +// child_port +// ); +// self.inner +// .endpoint_exts +// .get_mut(child_port) +// .expect("eefef") +// .endpoint +// .send(announcement.clone())?; +// } +// self.inner.round_index += 1; +// self.ephemeral.clear(); +// ret +// } - let mut predicate = Predicate::new_trivial(); +// // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found +// fn handle_locals_maybe_decide(&mut self) -> Result { +// if let Some(parent_port) = self.inner.family.parent_port { +// // I have a parent -> I'm not the leader +// let parent_endpoint = +// &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; +// for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { +// let msg = +// CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); +// log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); +// parent_endpoint.send(msg)?; +// } +// Ok(false) +// } else { +// // I have no parent -> I'm the leader +// assert!(self.inner.family.parent_port.is_none()); +// let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); +// Ok(if let Some(predicate) = maybe_predicate { +// let decision = Decision::Success(predicate); +// log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); +// self.end_round_with_decision(decision)?; +// true +// } else { +// false +// }) +// } +// } - // assign TRUE for puts and gets - let true_ports = puts.keys().chain(gets.iter()).copied(); - let true_channel_ids = true_ports.clone().map(port_to_channel_id); - predicate.batch_assign_nones(true_channel_ids, true); +// fn kick_off_native( +// &mut self, +// sync_batches: impl Iterator, +// ) -> Result { +// let MonoN { ports, .. } = self.inner.mono_n.clone(); +// let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; +// let mut branches = HashMap::<_, _>::default(); +// for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { +// let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; +// let all_ports = ports.iter().copied(); +// let all_channel_ids = all_ports.map(port_to_channel_id); - // assign FALSE for all in interface not assigned true - predicate.batch_assign_nones(all_channel_ids.clone(), false); +// let mut predicate = Predicate::new_trivial(); - if branches.contains_key(&predicate) { - // TODO what do I do with redundant predicates? - unimplemented!( - "Duplicate predicate {:#?}!\nHaving multiple batches with the same - predicate requires the support of oracle boolean variables", - &predicate, - ) - } - let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; - for (port, payload) in puts { - log!( - &mut self.inner.logger, - "... ... Initial native put msg {:?} pred {:?} batch {:?}", - &payload, - &predicate, - sync_batch_index, - ); - let msg = - CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } - .into_msg(*round_index); - endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; - } - log!( - &mut self.inner.logger, - "... Initial native branch batch index={} with pred {:?}", - sync_batch_index, - &predicate - ); - if branch.to_get.is_empty() { - self.ephemeral.solution_storage.submit_and_digest_subtree_solution( - &mut self.inner.logger, - SubtreeId::PolyN, - predicate.clone(), - ); - } - branches.insert(predicate, branch); - } - Ok(PolyN { ports, branches }) - } - pub fn sync_round( - &mut self, - deadline: Option, - sync_batches: Option>, - ) -> Result<(), SyncErr> { - if let Some(e) = self.unrecoverable_error { - return Err(e.clone()); - } - self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { - SyncErr::Timeout => e, // this isn't unrecoverable - _ => { - // Must set unrecoverable error! and tear down our net channels - self.unrecoverable_error = Some(e); - self.ephemeral.clear(); - self.inner.endpoint_exts = Default::default(); - e - } - }) - } +// // assign TRUE for puts and gets +// let true_ports = puts.keys().chain(gets.iter()).copied(); +// let true_channel_ids = true_ports.clone().map(port_to_channel_id); +// predicate.batch_assign_nones(true_channel_ids, true); - // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. - // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. - fn sync_round_inner( - &mut self, - mut deadline: Option, - sync_batches: Option>, - ) -> Result<(), SyncErr> { - log!( - &mut self.inner.logger, - "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", - self.inner.round_index - ); - assert!(self.ephemeral.is_clear()); - assert!(self.unrecoverable_error.is_none()); +// // assign FALSE for all in interface not assigned true +// predicate.batch_assign_nones(all_channel_ids.clone(), false); - // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). - // Some actors are dropped. some new actors are created. - // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors - self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); - log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); - while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { - let mut m_ctx = MonoPContext { - ports: &mut mono_p.ports, - mono_ps: &mut self.ephemeral.mono_ps, - inner: &mut self.inner, - }; - // cross boundary into crate::protocol - let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); - log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); - match blocker { - MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent), - MonoBlocker::ComponentExit => drop(mono_p), - MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), - } - } - log!( - &mut self.inner.logger, - "Finished running all MonoPs! Have {} PolyPs waiting", - self.ephemeral.poly_ps.len() - ); +// if branches.contains_key(&predicate) { +// // TODO what do I do with redundant predicates? +// unimplemented!( +// "Duplicate predicate {:#?}!\nHaving multiple batches with the same +// predicate requires the support of oracle boolean variables", +// &predicate, +// ) +// } +// let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; +// for (port, payload) in puts { +// log!( +// &mut self.inner.logger, +// "... ... Initial native put msg {:?} pred {:?} batch {:?}", +// &payload, +// &predicate, +// sync_batch_index, +// ); +// let msg = +// CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } +// .into_msg(*round_index); +// endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; +// } +// log!( +// &mut self.inner.logger, +// "... Initial native branch batch index={} with pred {:?}", +// sync_batch_index, +// &predicate +// ); +// if branch.to_get.is_empty() { +// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( +// &mut self.inner.logger, +// SubtreeId::PolyN, +// predicate.clone(), +// ); +// } +// branches.insert(predicate, branch); +// } +// Ok(PolyN { ports, branches }) +// } +// pub fn sync_round( +// &mut self, +// deadline: Option, +// sync_batches: Option>, +// ) -> Result<(), SyncError> { +// if let Some(e) = self.unrecoverable_error { +// return Err(e.clone()); +// } +// self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { +// SyncError::Timeout => e, // this isn't unrecoverable +// _ => { +// // Must set unrecoverable error! and tear down our net channels +// self.unrecoverable_error = Some(e); +// self.ephemeral.clear(); +// self.inner.endpoint_exts = Default::default(); +// e +// } +// }) +// } - // 3. define the mapping from port -> actor - // this is needed during the event loop to determine which actor - // should receive the incoming message. - // TODO: store and update this mapping rather than rebuilding it each round. - let port_to_holder: HashMap = { - use PolyId::*; - let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); - let p = self - .ephemeral - .poly_ps - .iter() - .enumerate() - .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); - n.chain(p).collect() - }; - log!( - &mut self.inner.logger, - "SET OF PolyPs and MonoPs final! port lookup map is {:?}", - &port_to_holder - ); +// // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. +// // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. +// fn sync_round_inner( +// &mut self, +// mut deadline: Option, +// sync_batches: Option>, +// ) -> Result<(), SyncError> { +// log!( +// &mut self.inner.logger, +// "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", +// self.inner.round_index +// ); +// assert!(self.ephemeral.is_clear()); +// assert!(self.unrecoverable_error.is_none()); - // 4. Create the solution storage. it tracks the solutions of "subtrees" - // of the controller in the overlay tree. - self.ephemeral.solution_storage.reset({ - let n = std::iter::once(SubtreeId::PolyN); - let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); - let c = self - .inner - .family - .children_ports - .iter() - .map(|&port| SubtreeId::ChildController { port }); - let subtree_id_iter = n.chain(m).chain(c); - log!( - &mut self.inner.logger, - "Solution Storage has subtree Ids: {:?}", - &subtree_id_iter.clone().collect::>() - ); - subtree_id_iter - }); +// // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). +// // Some actors are dropped. some new actors are created. +// // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors +// self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); +// log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); +// while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { +// let mut m_ctx = ProtoSyncContext { +// ports: &mut mono_p.ports, +// mono_ps: &mut self.ephemeral.mono_ps, +// inner: &mut self.inner, +// }; +// // cross boundary into crate::protocol +// let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); +// log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); +// match blocker { +// NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent), +// NonsyncBlocker::ComponentExit => drop(mono_p), +// NonsyncBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), +// } +// } +// log!( +// &mut self.inner.logger, +// "Finished running all MonoPs! Have {} PolyPs waiting", +// self.ephemeral.poly_ps.len() +// ); - // 5. kick off the synchronous round of the native actor if it exists +// // 3. define the mapping from port -> actor +// // this is needed during the event loop to determine which actor +// // should receive the incoming message. +// // TODO: store and update this mapping rather than rebuilding it each round. +// let port_to_holder: HashMap = { +// use PolyId::*; +// let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); +// let p = self +// .ephemeral +// .poly_ps +// .iter() +// .enumerate() +// .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); +// n.chain(p).collect() +// }; +// log!( +// &mut self.inner.logger, +// "SET OF PolyPs and MonoPs final! port lookup map is {:?}", +// &port_to_holder +// ); - log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); - self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { - // using if let because of nested ? operator - // TODO check that there are 1+ branches or NO SOLUTION - let poly_n = self.kick_off_native(sync_batches)?; - log!( - &mut self.inner.logger, - "PolyN kicked off, and has branches with predicates... {:?}", - poly_n.branches.keys().collect::>() - ); - Some(poly_n) - } else { - log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); - None - }; +// // 4. Create the solution storage. it tracks the solutions of "subtrees" +// // of the controller in the overlay tree. +// self.ephemeral.solution_storage.reset({ +// let n = std::iter::once(SubtreeId::PolyN); +// let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); +// let c = self +// .inner +// .family +// .children_ports +// .iter() +// .map(|&port| SubtreeId::ChildController { port }); +// let subtree_id_iter = n.chain(m).chain(c); +// log!( +// &mut self.inner.logger, +// "Solution Storage has subtree Ids: {:?}", +// &subtree_id_iter.clone().collect::>() +// ); +// subtree_id_iter +// }); - // 6. Kick off the synchronous round of each protocol actor - // If just one actor becomes inconsistent now, there can be no solution! - // TODO distinguish between completed and not completed poly_p's? - log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); - for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { - let my_subtree_id = SubtreeId::PolyP { index }; - let m_ctx = PolyPContext { - my_subtree_id, - inner: &mut self.inner, - solution_storage: &mut self.ephemeral.solution_storage, - }; - use SyncRunResult as Srr; - let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; - log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); - match blocker { - Srr::NoBranches => return Err(SyncErr::Inconsistent), - Srr::AllBranchesComplete | Srr::BlockingForRecv => (), - } - } - log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); +// // 5. kick off the synchronous round of the native actor if it exists - // 7. `solution_storage` may have new solutions for this controller - // handle their discovery. LEADER => announce, otherwise => send to parent - { - let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); - log!( - &mut self.inner.logger, - "Got {} controller-local solutions before a single RECV: {:?}", - peeked.len(), - peeked - ); - } - if self.handle_locals_maybe_decide()? { - return Ok(()); - } +// log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); +// self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { +// // using if let because of nested ? operator +// // TODO check that there are 1+ branches or NO SOLUTION +// let poly_n = self.kick_off_native(sync_batches)?; +// log!( +// &mut self.inner.logger, +// "PolyN kicked off, and has branches with predicates... {:?}", +// poly_n.branches.keys().collect::>() +// ); +// Some(poly_n) +// } else { +// log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); +// None +// }; - // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error - log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); - self.undelay_all(); - 'recv_loop: loop { - log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); - let received = match deadline { - None => { - // we have personally timed out. perform a "long" poll. - self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") - } - Some(d) => match self.recv(d)? { - // we have not yet timed out. performed a time-limited poll - Some(received) => received, - None => { - // timed out! send a FAILURE message to the sink, - // and henceforth don't time out on polling. - deadline = None; - match self.inner.family.parent_port { - None => { - // I am the sink! announce failure and return. - return self.end_round_with_decision(Decision::Failure); - } - Some(parent_port) => { - // I am not the sink! send a failure message. - let announcement = Msg::CommMsg(CommMsg { - round_index: self.inner.round_index, - contents: CommMsgContents::Failure, - }); - log!( - &mut self.inner.logger, - "Forwarding {:?} to parent with port {:?}", - &announcement, - parent_port - ); - self.inner - .endpoint_exts - .get_mut(parent_port) - .expect("ss") - .endpoint - .send(announcement.clone())?; - continue; // poll some more - } - } - } - }, - }; - log!(&mut self.inner.logger, "::: message {:?}...", &received); - let current_content = match received.msg { - Msg::SetupMsg(s) => { - // This occurs in the event the connector was malformed during connect() - println!("WASNT EXPECTING {:?}", s); - return Err(SyncErr::UnexpectedSetupMsg); - } - Msg::CommMsg(CommMsg { round_index, .. }) - if round_index < self.inner.round_index => - { - // Old message! Can safely discard - log!(&mut self.inner.logger, "...and its OLD! :("); - drop(received); - continue 'recv_loop; - } - Msg::CommMsg(CommMsg { round_index, .. }) - if round_index > self.inner.round_index => - { - // Message from a next round. Keep for later! - log!(&mut self.inner.logger, "... DELAY! :("); - self.delay(received); - continue 'recv_loop; - } - Msg::CommMsg(CommMsg { contents, round_index }) => { - log!( - &mut self.inner.logger, - "... its a round-appropriate CommMsg with port {:?}", - received.recipient - ); - assert_eq!(round_index, self.inner.round_index); - contents - } - }; - match current_content { - CommMsgContents::Failure => match self.inner.family.parent_port { - Some(parent_port) => { - let announcement = Msg::CommMsg(CommMsg { - round_index: self.inner.round_index, - contents: CommMsgContents::Failure, - }); - log!( - &mut self.inner.logger, - "Forwarding {:?} to parent with port {:?}", - &announcement, - parent_port - ); - self.inner - .endpoint_exts - .get_mut(parent_port) - .expect("ss") - .endpoint - .send(announcement.clone())?; - } - None => return self.end_round_with_decision(Decision::Failure), - }, - CommMsgContents::Elaborate { partial_oracle } => { - // Child controller submitted a subtree solution. - if !self.inner.family.children_ports.contains(&received.recipient) { - return Err(SyncErr::ElaborateFromNonChild); - } - let subtree_id = SubtreeId::ChildController { port: received.recipient }; - log!( - &mut self.inner.logger, - "Received elaboration from child for subtree {:?}: {:?}", - subtree_id, - &partial_oracle - ); - self.ephemeral.solution_storage.submit_and_digest_subtree_solution( - &mut self.inner.logger, - subtree_id, - partial_oracle, - ); - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - } - CommMsgContents::Announce { decision } => { - if self.inner.family.parent_port != Some(received.recipient) { - return Err(SyncErr::AnnounceFromNonParent); - } - log!( - &mut self.inner.logger, - "Received ANNOUNCEMENT from from parent {:?}: {:?}", - received.recipient, - &decision - ); - return self.end_round_with_decision(decision); - } - CommMsgContents::SendPayload { payload_predicate, payload } => { - // check that we expect to be able to receive payloads from this sender - assert_eq!( - Getter, - self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity - ); +// // 6. Kick off the synchronous round of each protocol actor +// // If just one actor becomes inconsistent now, there can be no solution! +// // TODO distinguish between completed and not completed poly_p's? +// log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); +// for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { +// let my_subtree_id = SubtreeId::PolyP { index }; +// let m_ctx = PolyPContext { +// my_subtree_id, +// inner: &mut self.inner, +// solution_storage: &mut self.ephemeral.solution_storage, +// }; +// use SyncRunResult as Srr; +// let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; +// log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); +// match blocker { +// Srr::NoBranches => return Err(SyncError::Inconsistent), +// Srr::AllBranchesComplete | Srr::BlockingForRecv => (), +// } +// } +// log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); - // message for some actor. Feed it to the appropriate actor - // and then give them another chance to run. - let subtree_id = port_to_holder.get(&received.recipient); - log!( - &mut self.inner.logger, - "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", - subtree_id, - &payload_predicate, - &payload - ); - let channel_id = self - .inner - .endpoint_exts - .get(received.recipient) - .expect("UEHFU") - .info - .channel_id; - if payload_predicate.query(channel_id) != Some(true) { - // sender didn't preserve the invariant - return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id)); - } - match subtree_id { - None => { - // this happens when a message is sent to a component that has exited. - // It's safe to drop this message; - // The sender branch will certainly not be part of the solution - } - Some(PolyId::N) => { - // Message for NativeMachine - self.ephemeral.poly_n.as_mut().unwrap().sync_recv( - received.recipient, - &mut self.inner.logger, - payload, - payload_predicate, - &mut self.ephemeral.solution_storage, - ); - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - } - Some(PolyId::P { index }) => { - // Message for protocol actor - let poly_p = &mut self.ephemeral.poly_ps[*index]; +// // 7. `solution_storage` may have new solutions for this controller +// // handle their discovery. LEADER => announce, otherwise => send to parent +// { +// let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); +// log!( +// &mut self.inner.logger, +// "Got {} controller-local solutions before a single RECV: {:?}", +// peeked.len(), +// peeked +// ); +// } +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } - let m_ctx = PolyPContext { - my_subtree_id: SubtreeId::PolyP { index: *index }, - inner: &mut self.inner, - solution_storage: &mut self.ephemeral.solution_storage, - }; - use SyncRunResult as Srr; - let blocker = poly_p.poly_recv_run( - m_ctx, - &self.protocol_description, - received.recipient, - payload_predicate, - payload, - )?; - log!( - &mut self.inner.logger, - "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", - subtree_id, - blocker - ); - match blocker { - Srr::NoBranches => return Err(SyncErr::Inconsistent), - Srr::BlockingForRecv | Srr::AllBranchesComplete => { - { - let peeked = self - .ephemeral - .solution_storage - .peek_new_locals() - .collect::>(); - log!( - &mut self.inner.logger, - "Got {} new controller-local solutions from RECV: {:?}", - peeked.len(), - peeked - ); - } - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - } - } - } - }; - } - } - } - } -} -impl ControllerEphemeral { - fn is_clear(&self) -> bool { - self.solution_storage.is_clear() - && self.poly_n.is_none() - && self.poly_ps.is_empty() - && self.mono_ps.is_empty() - && self.port_to_holder.is_empty() - } - fn clear(&mut self) { - self.solution_storage.clear(); - self.poly_n.take(); - self.poly_ps.clear(); - self.port_to_holder.clear(); - } -} -impl Into for MonoP { - fn into(self) -> PolyP { - PolyP { - complete: Default::default(), - incomplete: hashmap! { - Predicate::new_trivial() => - BranchP { - state: self.state, - inbox: Default::default(), - outbox: Default::default(), - blocking_on: None, - } - }, - ports: self.ports, - } - } -} +// // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error +// log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); +// self.undelay_all(); +// 'recv_loop: loop { +// log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); +// let received = match deadline { +// None => { +// // we have personally timed out. perform a "long" poll. +// self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") +// } +// Some(d) => match self.recv(d)? { +// // we have not yet timed out. performed a time-limited poll +// Some(received) => received, +// None => { +// // timed out! send a FAILURE message to the sink, +// // and henceforth don't time out on polling. +// deadline = None; +// match self.inner.family.parent_port { +// None => { +// // I am the sink! announce failure and return. +// return self.end_round_with_decision(Decision::Failure); +// } +// Some(parent_port) => { +// // I am not the sink! send a failure message. +// let announcement = Msg::CommMsg(CommMsg { +// round_index: self.inner.round_index, +// contents: CommMsgContents::Failure, +// }); +// log!( +// &mut self.inner.logger, +// "Forwarding {:?} to parent with port {:?}", +// &announcement, +// parent_port +// ); +// self.inner +// .endpoint_exts +// .get_mut(parent_port) +// .expect("ss") +// .endpoint +// .send(announcement.clone())?; +// continue; // poll some more +// } +// } +// } +// }, +// }; +// log!(&mut self.inner.logger, "::: message {:?}...", &received); +// let current_content = match received.msg { +// Msg::SetupMsg(s) => { +// // This occurs in the event the connector was malformed during connect() +// println!("WASNT EXPECTING {:?}", s); +// return Err(SyncError::UnexpectedSetupMsg); +// } +// Msg::CommMsg(CommMsg { round_index, .. }) +// if round_index < self.inner.round_index => +// { +// // Old message! Can safely discard +// log!(&mut self.inner.logger, "...and its OLD! :("); +// drop(received); +// continue 'recv_loop; +// } +// Msg::CommMsg(CommMsg { round_index, .. }) +// if round_index > self.inner.round_index => +// { +// // Message from a next round. Keep for later! +// log!(&mut self.inner.logger, "... DELAY! :("); +// self.delay(received); +// continue 'recv_loop; +// } +// Msg::CommMsg(CommMsg { contents, round_index }) => { +// log!( +// &mut self.inner.logger, +// "... its a round-appropriate CommMsg with port {:?}", +// received.recipient +// ); +// assert_eq!(round_index, self.inner.round_index); +// contents +// } +// }; +// match current_content { +// CommMsgContents::Failure => match self.inner.family.parent_port { +// Some(parent_port) => { +// let announcement = Msg::CommMsg(CommMsg { +// round_index: self.inner.round_index, +// contents: CommMsgContents::Failure, +// }); +// log!( +// &mut self.inner.logger, +// "Forwarding {:?} to parent with port {:?}", +// &announcement, +// parent_port +// ); +// self.inner +// .endpoint_exts +// .get_mut(parent_port) +// .expect("ss") +// .endpoint +// .send(announcement.clone())?; +// } +// None => return self.end_round_with_decision(Decision::Failure), +// }, +// CommMsgContents::Elaborate { partial_oracle } => { +// // Child controller submitted a subtree solution. +// if !self.inner.family.children_ports.contains(&received.recipient) { +// return Err(SyncError::ElaborateFromNonChild); +// } +// let subtree_id = SubtreeId::ChildController { port: received.recipient }; +// log!( +// &mut self.inner.logger, +// "Received elaboration from child for subtree {:?}: {:?}", +// subtree_id, +// &partial_oracle +// ); +// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( +// &mut self.inner.logger, +// subtree_id, +// partial_oracle, +// ); +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } +// } +// CommMsgContents::Announce { decision } => { +// if self.inner.family.parent_port != Some(received.recipient) { +// return Err(SyncError::AnnounceFromNonParent); +// } +// log!( +// &mut self.inner.logger, +// "Received ANNOUNCEMENT from from parent {:?}: {:?}", +// received.recipient, +// &decision +// ); +// return self.end_round_with_decision(decision); +// } +// CommMsgContents::SendPayload { payload_predicate, payload } => { +// // check that we expect to be able to receive payloads from this sender +// assert_eq!( +// Getter, +// self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity +// ); -impl From for SyncErr { - fn from(e: EndpointErr) -> SyncErr { - SyncErr::EndpointErr(e) - } -} +// // message for some actor. Feed it to the appropriate actor +// // and then give them another chance to run. +// let subtree_id = port_to_holder.get(&received.recipient); +// log!( +// &mut self.inner.logger, +// "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", +// subtree_id, +// &payload_predicate, +// &payload +// ); +// let channel_id = self +// .inner +// .endpoint_exts +// .get(received.recipient) +// .expect("UEHFU") +// .info +// .channel_id; +// if payload_predicate.query(channel_id) != Some(true) { +// // sender didn't preserve the invariant +// return Err(SyncError::PayloadPremiseExcludesTheChannel(channel_id)); +// } +// match subtree_id { +// None => { +// // this happens when a message is sent to a component that has exited. +// // It's safe to drop this message; +// // The sender branch will certainly not be part of the solution +// } +// Some(PolyId::N) => { +// // Message for NativeMachine +// self.ephemeral.poly_n.as_mut().unwrap().sync_recv( +// received.recipient, +// &mut self.inner.logger, +// payload, +// payload_predicate, +// &mut self.ephemeral.solution_storage, +// ); +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } +// } +// Some(PolyId::P { index }) => { +// // Message for protocol actor +// let poly_p = &mut self.ephemeral.poly_ps[*index]; -impl MonoContext for MonoPContext<'_> { - type D = ProtocolD; - type S = ProtocolS; - fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { - log!( - &mut self.inner.logger, - "!! MonoContext callback to new_component with ports {:?}!", - &moved_ports, - ); - if moved_ports.is_subset(self.ports) { - self.ports.retain(|x| !moved_ports.contains(x)); - self.mono_ps.push(MonoP { state: init_state, ports: moved_ports }); - } else { - panic!("MachineP attempting to move alien port!"); - } - } - fn new_channel(&mut self) -> [PortId; 2] { - let [a, b] = Endpoint::new_memory_pair(); - let channel_id = self.inner.channel_id_stream.next(); +// let m_ctx = PolyPContext { +// my_subtree_id: SubtreeId::PolyP { index: *index }, +// inner: &mut self.inner, +// solution_storage: &mut self.ephemeral.solution_storage, +// }; +// use SyncRunResult as Srr; +// let blocker = poly_p.poly_recv_run( +// m_ctx, +// &self.protocol_description, +// received.recipient, +// payload_predicate, +// payload, +// )?; +// log!( +// &mut self.inner.logger, +// "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", +// subtree_id, +// blocker +// ); +// match blocker { +// Srr::NoBranches => return Err(SyncError::Inconsistent), +// Srr::BlockingForRecv | Srr::AllBranchesComplete => { +// { +// let peeked = self +// .ephemeral +// .solution_storage +// .peek_new_locals() +// .collect::>(); +// log!( +// &mut self.inner.logger, +// "Got {} new controller-local solutions from RECV: {:?}", +// peeked.len(), +// peeked +// ); +// } +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } +// } +// } +// } +// }; +// } +// } +// } +// } +// } +// impl ControllerEphemeral { +// fn is_clear(&self) -> bool { +// self.solution_storage.is_clear() +// && self.poly_n.is_none() +// && self.poly_ps.is_empty() +// && self.mono_ps.is_empty() +// && self.port_to_holder.is_empty() +// } +// fn clear(&mut self) { +// self.solution_storage.clear(); +// self.poly_n.take(); +// self.poly_ps.clear(); +// self.port_to_holder.clear(); +// } +// } +// impl Into for MonoP { +// fn into(self) -> PolyP { +// PolyP { +// complete: Default::default(), +// incomplete: hashmap! { +// Predicate::new_trivial() => +// BranchP { +// state: self.state, +// inbox: Default::default(), +// outbox: Default::default(), +// blocking_on: None, +// } +// }, +// ports: self.ports, +// } +// } +// } - let mut clos = |endpoint, polarity| { - let endpoint_ext = - EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint }; - let port = self.inner.endpoint_exts.alloc(endpoint_ext); - let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint; - let token = PortId::to_token(port); - self.inner - .messenger_state - .poll - .register(endpoint, token, Ready::readable(), PollOpt::edge()) - .expect("AAGAGGGGG"); - self.ports.insert(port); - port - }; - let [kp, kg] = [clos(a, Putter), clos(b, Getter)]; - log!( - &mut self.inner.logger, - "!! MonoContext callback to new_channel. returning ports {:?}!", - [kp, kg], - ); - [kp, kg] - } - fn new_random(&mut self) -> u64 { - type Bytes8 = [u8; std::mem::size_of::()]; - let mut bytes = Bytes8::default(); - getrandom::getrandom(&mut bytes).unwrap(); - let val = unsafe { std::mem::transmute::(bytes) }; - log!( - &mut self.inner.logger, - "!! MonoContext callback to new_random. returning val {:?}!", - val, - ); - val - } -} +// impl From for SyncError { +// fn from(e: EndpointError) -> SyncError { +// SyncError::EndpointError(e) +// } +// } -impl SolutionStorage { - fn is_clear(&self) -> bool { - self.subtree_id_to_index.is_empty() - && self.subtree_solutions.is_empty() - && self.old_local.is_empty() - && self.new_local.is_empty() - } - fn clear(&mut self) { - self.subtree_id_to_index.clear(); - self.subtree_solutions.clear(); - self.old_local.clear(); - self.new_local.clear(); - } - pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { - self.subtree_id_to_index.clear(); - self.subtree_solutions.clear(); - self.old_local.clear(); - self.new_local.clear(); - for key in subtree_ids { - self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); - self.subtree_solutions.push(Default::default()) - } - } +// impl ProtoSyncContext<'_> { +// fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { +// todo!() +// } +// fn new_channel(&mut self) -> [PortId; 2] { +// todo!() +// } +// } - pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { - self.new_local.iter() - } +// impl SolutionStorage { +// fn is_clear(&self) -> bool { +// self.subtree_id_to_index.is_empty() +// && self.subtree_solutions.is_empty() +// && self.old_local.is_empty() +// && self.new_local.is_empty() +// } +// fn clear(&mut self) { +// self.subtree_id_to_index.clear(); +// self.subtree_solutions.clear(); +// self.old_local.clear(); +// self.new_local.clear(); +// } +// pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { +// self.subtree_id_to_index.clear(); +// self.subtree_solutions.clear(); +// self.old_local.clear(); +// self.new_local.clear(); +// for key in subtree_ids { +// self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); +// self.subtree_solutions.push(Default::default()) +// } +// } - pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { - let Self { old_local, new_local, .. } = self; - new_local.drain().map(move |local| { - old_local.insert(local.clone()); - local - }) - } +// pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { +// self.new_local.iter() +// } - pub(crate) fn submit_and_digest_subtree_solution( - &mut self, - logger: &mut String, - subtree_id: SubtreeId, - predicate: Predicate, - ) { - log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); - let index = self.subtree_id_to_index[&subtree_id]; - let left = 0..index; - let right = (index + 1)..self.subtree_solutions.len(); +// pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { +// let Self { old_local, new_local, .. } = self; +// new_local.drain().map(move |local| { +// old_local.insert(local.clone()); +// local +// }) +// } - let Self { subtree_solutions, new_local, old_local, .. } = self; - let was_new = subtree_solutions[index].insert(predicate.clone()); - if was_new { - let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); - Self::elaborate_into_new_local_rec( - logger, - predicate, - set_visitor, - old_local, - new_local, - ); - } - } +// pub(crate) fn submit_and_digest_subtree_solution( +// &mut self, +// logger: &mut String, +// subtree_id: SubtreeId, +// predicate: Predicate, +// ) { +// log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); +// let index = self.subtree_id_to_index[&subtree_id]; +// let left = 0..index; +// let right = (index + 1)..self.subtree_solutions.len(); - fn elaborate_into_new_local_rec<'a, 'b>( - logger: &mut String, - partial: Predicate, - mut set_visitor: impl Iterator> + Clone, - old_local: &'b HashSet, - new_local: &'a mut HashSet, - ) { - if let Some(set) = set_visitor.next() { - // incomplete solution. keep traversing - for pred in set.iter() { - if let Some(elaborated) = pred.union_with(&partial) { - Self::elaborate_into_new_local_rec( - logger, - elaborated, - set_visitor.clone(), - old_local, - new_local, - ) - } - } - } else { - // recursive stop condition. `partial` is a local subtree solution - if !old_local.contains(&partial) { - // ... and it hasn't been found before - log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); - new_local.insert(partial); - } - } - } -} -impl PolyContext for BranchPContext<'_, '_> { - type D = ProtocolD; +// let Self { subtree_solutions, new_local, old_local, .. } = self; +// let was_new = subtree_solutions[index].insert(predicate.clone()); +// if was_new { +// let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); +// Self::elaborate_into_new_local_rec( +// logger, +// predicate, +// set_visitor, +// old_local, +// new_local, +// ); +// } +// } - fn is_firing(&mut self, port: PortId) -> Option { - assert!(self.ports.contains(&port)); - let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; - let val = self.predicate.query(channel_id); - log!( - &mut self.m_ctx.inner.logger, - "!! PolyContext callback to is_firing by {:?}! returning {:?}", - self.m_ctx.my_subtree_id, - val, - ); - val - } - fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - assert!(self.ports.contains(&port)); - let val = self.inbox.get(&port); - log!( - &mut self.m_ctx.inner.logger, - "!! PolyContext callback to read_msg by {:?}! returning {:?}", - self.m_ctx.my_subtree_id, - val, - ); - val - } -} +// fn elaborate_into_new_local_rec<'a, 'b>( +// logger: &mut String, +// partial: Predicate, +// mut set_visitor: impl Iterator> + Clone, +// old_local: &'b HashSet, +// new_local: &'a mut HashSet, +// ) { +// if let Some(set) = set_visitor.next() { +// // incomplete solution. keep traversing +// for pred in set.iter() { +// if let Some(elaborated) = pred.union_with(&partial) { +// Self::elaborate_into_new_local_rec( +// logger, +// elaborated, +// set_visitor.clone(), +// old_local, +// new_local, +// ) +// } +// } +// } else { +// // recursive stop condition. `partial` is a local subtree solution +// if !old_local.contains(&partial) { +// // ... and it hasn't been found before +// log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); +// new_local.insert(partial); +// } +// } +// } +// } +// impl PolyContext for BranchPContext<'_, '_> { +// type D = ProtocolD; + +// fn is_firing(&mut self, port: PortId) -> Option { +// assert!(self.ports.contains(&port)); +// let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; +// let val = self.predicate.query(channel_id); +// log!( +// &mut self.m_ctx.inner.logger, +// "!! PolyContext callback to is_firing by {:?}! returning {:?}", +// self.m_ctx.my_subtree_id, +// val, +// ); +// val +// } +// fn read_msg(&mut self, port: PortId) -> Option<&Payload> { +// assert!(self.ports.contains(&port)); +// let val = self.inbox.get(&port); +// log!( +// &mut self.m_ctx.inner.logger, +// "!! PolyContext callback to read_msg by {:?}! returning {:?}", +// self.m_ctx.my_subtree_id, +// val, +// ); +// val +// } +// } diff --git a/src/runtime/error.rs b/src/runtime/error.rs new file mode 100644 index 0000000000000000000000000000000000000000..0072ca449f2d0d940ad49d0341c745f5c40cf978 --- /dev/null +++ b/src/runtime/error.rs @@ -0,0 +1,15 @@ +use crate::common::*; + +pub enum EndpointError { + MalformedMessage, + BrokenEndpoint, +} +pub enum TryRecyAnyError { + Timeout, + PollFailed, + EndpointError { error: EndpointError, index: usize }, + BrokenEndpoint(usize), +} +pub enum SyncError { + Timeout, +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 20d756b80a66a1c6ff7ea04e8276039df12f1443..327b5ffcdd162e0dcff5664b368d52f6c5dd9443 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,59 +1,59 @@ -// #[cfg(feature = "ffi")] -// pub mod ffi; +mod communication; +mod error; +mod setup2; -// mod actors; -// pub(crate) mod communication; -// pub(crate) mod connector; -// pub(crate) mod endpoint; -// pub mod errors; -// mod serde; +#[cfg(test)] mod my_tests; -mod setup2; -// pub(crate) mod setup; -// mod v2; use crate::common::*; -// use actors::*; -// use endpoint::*; -// use errors::*; +use error::*; +#[derive(Clone, Copy, Debug)] +pub enum LocalComponentId { + Native, + Proto { index: usize }, +} +#[derive(Debug, Clone, Copy)] +pub enum Route { + LocalComponent(LocalComponentId), + Endpoint { index: usize }, +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct MyPortInfo { + polarity: Polarity, + port: PortId, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub(crate) enum Decision { +pub enum Decision { Failure, Success(Predicate), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub(crate) enum Msg { +pub enum Msg { SetupMsg(SetupMsg), CommMsg(CommMsg), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct MyPortInfo { - polarity: Polarity, - port: PortId, -} -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub(crate) enum SetupMsg { - // sent by the passive endpoint to the active endpoint - // MyPortInfo(MyPortInfo), +pub enum SetupMsg { + MyPortInfo(MyPortInfo), LeaderEcho { maybe_leader: ControllerId }, LeaderAnnounce { leader: ControllerId }, YouAreMyParent, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub(crate) struct CommMsg { +pub struct CommMsg { pub round_index: usize, pub contents: CommMsgContents, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub(crate) enum CommMsgContents { +pub enum CommMsgContents { SendPayload { payload_predicate: Predicate, payload: Payload }, Elaborate { partial_oracle: Predicate }, // SINKWARD Failure, // SINKWARD Announce { decision: Decision }, // SINKAWAYS } #[derive(Debug, PartialEq)] -pub(crate) enum CommonSatResult { +pub enum CommonSatResult { FormerNotLatter, LatterNotFormer, Equivalent, @@ -78,19 +78,12 @@ pub struct ProtoComponent { state: ComponentState, ports: HashSet, } -#[derive(Debug)] -pub enum InpRoute { - NativeComponent, - ProtoComponent { index: usize }, - Endpoint { index: usize }, -} pub trait Logger: Debug { fn line_writer(&mut self) -> &mut dyn std::fmt::Write; fn dump_log(&self, w: &mut dyn std::io::Write); } #[derive(Debug, Clone)] pub struct EndpointSetup { - pub polarity: Polarity, pub sock_addr: SocketAddr, pub is_active: bool, } @@ -116,11 +109,17 @@ pub struct EndpointManager { // 2. Events is empty poll: Poll, events: Events, - undrained_endpoints: IndexSet, + polled_undrained: IndexSet, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, endpoint_exts: Vec, } +#[derive(Debug, Default)] +pub struct PortInfo { + polarities: HashMap, + peers: HashMap, + routes: HashMap, +} #[derive(Debug)] pub struct Connector { logger: Box, @@ -128,8 +127,7 @@ pub struct Connector { id_manager: IdManager, native_ports: HashSet, proto_components: Vec, - outp_to_inp: HashMap, - inp_to_route: HashMap, + port_info: PortInfo, phased: ConnectorPhased, } #[derive(Debug)] @@ -142,77 +140,147 @@ pub enum ConnectorPhased { endpoint_manager: EndpointManager, neighborhood: Neighborhood, mem_inbox: Vec, + native_actor: NativeActor, // sync invariant: in Nonsync state }, } #[derive(Debug)] pub struct StringLogger(ControllerId, String); -#[derive(Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -pub(crate) struct Predicate { +#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +pub struct Predicate { pub assigned: BTreeMap, } -#[derive(Debug, Default)] -struct SyncBatch { - puts: HashMap, - gets: HashSet, -} pub struct MonitoredReader { bytes: usize, r: R, } -pub enum EndpointRecvErr { - MalformedMessage, - BrokenEndpoint, -} -pub struct SyncContext<'a> { +pub struct SyncProtoContext<'a> { connector: &'a mut Connector, + proto_component_index: usize, } -pub struct NonsyncContext<'a> { +pub struct NonsyncProtoContext<'a> { connector: &'a mut Connector, + proto_component_index: usize, } -enum TryRecyAnyError { - Timeout, - PollFailed, - EndpointRecvErr { error: EndpointRecvErr, index: usize }, - BrokenEndpoint(usize), + +// pub struct MonoPContext<'a> { +// inner: &'a mut ControllerInner, +// ports: &'a mut HashSet, +// mono_ps: &'a mut Vec, +// } +// pub struct PolyPContext<'a> { +// my_subtree_id: SubtreeId, +// inner: &'a mut Connector, +// solution_storage: &'a mut SolutionStorage, +// } +// impl PolyPContext<'_> { +// #[inline(always)] +// fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { +// let Self { solution_storage, my_subtree_id, inner } = self; +// PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } +// } +// } +// struct BranchPContext<'m, 'r> { +// m_ctx: PolyPContext<'m>, +// ports: &'r HashSet, +// predicate: &'r Predicate, +// inbox: &'r HashMap, +// } + +#[derive(Default)] +pub struct SolutionStorage { + old_local: HashSet, + new_local: HashSet, + // this pair acts as SubtreeId -> HashSet which is friendlier to iteration + subtree_solutions: Vec>, + subtree_id_to_index: HashMap, +} +#[derive(Debug)] +pub enum SyncRunResult { + BlockingForRecv, + AllBranchesComplete, + NoBranches, +} +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum PolyId { + N, + P { index: usize }, +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum SubtreeId { + PolyN, + PolyP { index: usize }, + ChildController { port: PortId }, } +#[derive(Debug, Default)] +pub struct SyncBatch { + to_put: HashMap, + to_get: HashSet, +} +#[derive(Debug)] +pub enum NativeActor { + Nonsync { + sync_result_branch: Option, // invariant: sync_result_branch.to_get.is_empty() + next_batches: Vec, // invariant: nonempty + }, + Sync { + branches: HashMap, + }, +} +#[derive(Debug)] +pub struct NativeBranch { + batch_index: usize, + gotten: HashMap, + to_get: HashSet, +} + //////////////// impl EndpointManager { fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { self.endpoint_exts[index].endpoint.send(msg) } - fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { + fn try_recv_any( + &mut self, + logger: &mut dyn Logger, + deadline: Instant, + ) -> Result<(usize, Msg), TryRecyAnyError> { use TryRecyAnyError::*; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { return Ok(x); } - // 2. try read from sockets nonblocking - while let Some(index) = self.undrained_endpoints.pop() { - if let Some(msg) = self.endpoint_exts[index] - .endpoint - .try_recv() - .map_err(|error| EndpointRecvErr { error, index })? - { - return Ok((index, msg)); - } - } - // 3. poll for progress + loop { + // 2. try read a message from an enpoint that previously raised an event + while let Some(index) = self.polled_undrained.pop() { + let endpoint = &mut self.endpoint_exts[index].endpoint; + if let Some(msg) = + endpoint.try_recv().map_err(|error| EndpointError { error, index })? + { + if !endpoint.inbox.is_empty() { + // there may be another message waiting! + self.polled_undrained.insert(index); + } + return Ok((index, msg)); + } + } + // 3. No message yet. poll! let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; for event in self.events.iter() { + log!(logger, "Poll event {:?}", event); let Token(index) = event.token(); - if let Some(msg) = self.endpoint_exts[index] - .endpoint - .try_recv() - .map_err(|error| EndpointRecvErr { error, index })? - { - return Ok((index, msg)); - } + self.polled_undrained.insert(index); } } } fn undelay_all(&mut self) { + if self.undelayed_messages.is_empty() { + // fast path + std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages); + return; + } + // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } } @@ -221,22 +289,6 @@ impl Debug for Endpoint { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() } } -impl NonsyncContext<'_> { - pub fn new_component(&mut self, moved_ports: HashSet, init_state: ComponentState) { - todo!() - } - pub fn new_channel(&mut self) -> [PortId; 2] { - todo!() - } -} -impl SyncContext<'_> { - pub fn is_firing(&mut self, port: PortId) -> Option { - todo!() - } - pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - todo!() - } -} impl From for MonitoredReader { fn from(r: R) -> Self { Self { r, bytes: 0 } @@ -259,15 +311,6 @@ impl Into for SetupMsg { Msg::SetupMsg(self) } } -impl Debug for Predicate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.pad("{")?; - for (port, &v) in self.assigned.iter() { - f.write_fmt(format_args!("{:?}=>{}, ", port, if v { 'T' } else { 'F' }))? - } - f.pad("}") - } -} impl StringLogger { pub fn new(controller_id: ControllerId) -> Self { Self(controller_id, String::default()) @@ -308,8 +351,8 @@ impl IdManager { } } impl Endpoint { - fn try_recv(&mut self) -> Result, EndpointRecvErr> { - use EndpointRecvErr::*; + fn try_recv(&mut self) -> Result, EndpointError> { + use EndpointError::*; // populate inbox as much as possible 'read_loop: loop { match self.stream.read_to_end(&mut self.inbox) { @@ -357,518 +400,133 @@ impl Connector { writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap(); } } +impl Debug for SolutionStorage { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.pad("Solutions: [")?; + for (subtree_id, &index) in self.subtree_id_to_index.iter() { + let sols = &self.subtree_solutions[index]; + f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; + } + f.pad("]") + } +} -// #[derive(Debug)] -// pub enum Connector { -// Unconfigured(Unconfigured), -// Configured(Configured), -// Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space -// } -// #[derive(Debug)] -// pub struct Unconfigured { -// pub controller_id: ControllerId, -// } -// #[derive(Debug)] -// pub struct Configured { -// controller_id: ControllerId, -// polarities: Vec, -// bindings: HashMap, -// protocol_description: Arc, -// main_component: Vec, -// logger: String, -// } -// #[derive(Debug)] -// pub struct Connected { -// native_interface: Vec<(PortId, Polarity)>, -// sync_batches: Vec, -// // controller is cooperatively scheduled with the native application -// // (except for transport layer behind Endpoints, which are managed by the OS) -// // control flow is passed to the controller during methods on Connector (primarily, connect and sync). -// controller: Controller, -// } - -// #[derive(Debug, Copy, Clone)] -// pub enum PortBinding { -// Native, -// Active(SocketAddr), -// Passive(SocketAddr), -// } - -// #[derive(Debug)] -// struct Arena { -// storage: Vec, -// } - -// #[derive(Debug)] -// struct ReceivedMsg { -// recipient: PortId, -// msg: Msg, -// } - -// #[derive(Debug)] -// struct MessengerState { -// poll: Poll, -// events: Events, -// delayed: Vec, -// undelayed: Vec, -// polled_undrained: IndexSet, -// } -// #[derive(Debug)] -// struct ChannelIdStream { -// controller_id: ControllerId, -// next_channel_index: ChannelIndex, -// } - -// #[derive(Debug)] -// struct Controller { -// protocol_description: Arc, -// inner: ControllerInner, -// ephemeral: ControllerEphemeral, -// unrecoverable_error: Option, // prevents future calls to Sync -// } -// #[derive(Debug)] -// struct ControllerInner { -// round_index: usize, -// channel_id_stream: ChannelIdStream, -// endpoint_exts: Arena, -// messenger_state: MessengerState, -// mono_n: MonoN, // state at next round start -// mono_ps: Vec, // state at next round start -// family: ControllerFamily, -// logger: String, -// } - -// /// This structure has its state entirely reset between synchronous rounds -// #[derive(Debug, Default)] -// struct ControllerEphemeral { -// solution_storage: SolutionStorage, -// poly_n: Option, -// poly_ps: Vec, -// mono_ps: Vec, -// port_to_holder: HashMap, -// } - -// #[derive(Debug)] -// struct ControllerFamily { -// parent_port: Option, -// children_ports: Vec, -// } - -// #[derive(Debug)] -// pub(crate) enum SyncRunResult { -// BlockingForRecv, -// AllBranchesComplete, -// NoBranches, -// } - -// // Used to identify poly actors -// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -// enum PolyId { -// N, -// P { index: usize }, -// } - -// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -// pub(crate) enum SubtreeId { -// PolyN, -// PolyP { index: usize }, -// ChildController { port: PortId }, -// } - -// pub(crate) struct MonoPContext<'a> { -// inner: &'a mut ControllerInner, -// ports: &'a mut HashSet, -// mono_ps: &'a mut Vec, -// } -// pub(crate) struct PolyPContext<'a> { -// my_subtree_id: SubtreeId, -// inner: &'a mut ControllerInner, -// solution_storage: &'a mut SolutionStorage, -// } -// impl PolyPContext<'_> { -// #[inline(always)] -// fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { -// let Self { solution_storage, my_subtree_id, inner } = self; -// PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } -// } -// } -// struct BranchPContext<'m, 'r> { -// m_ctx: PolyPContext<'m>, -// ports: &'r HashSet, -// predicate: &'r Predicate, -// inbox: &'r HashMap, -// } - -// #[derive(Default)] -// pub(crate) struct SolutionStorage { -// old_local: HashSet, -// new_local: HashSet, -// // this pair acts as SubtreeId -> HashSet which is friendlier to iteration -// subtree_solutions: Vec>, -// subtree_id_to_index: HashMap, -// } - -// trait Messengerlike { -// fn get_state_mut(&mut self) -> &mut MessengerState; -// fn get_endpoint_mut(&mut self, eport: PortId) -> &mut Endpoint; - -// fn delay(&mut self, received: ReceivedMsg) { -// self.get_state_mut().delayed.push(received); -// } -// fn undelay_all(&mut self) { -// let MessengerState { delayed, undelayed, .. } = self.get_state_mut(); -// undelayed.extend(delayed.drain(..)) -// } - -// fn send(&mut self, to: PortId, msg: Msg) -> Result<(), EndpointErr> { -// self.get_endpoint_mut(to).send(msg) -// } - -// // attempt to receive a message from one of the endpoints before the deadline -// fn recv(&mut self, deadline: Instant) -> Result, MessengerRecvErr> { -// // try get something buffered -// if let Some(x) = self.get_state_mut().undelayed.pop() { -// return Ok(Some(x)); -// } - -// loop { -// // polled_undrained may not be empty -// while let Some(eport) = self.get_state_mut().polled_undrained.pop() { -// if let Some(msg) = self -// .get_endpoint_mut(eport) -// .recv() -// .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? -// { -// // this endpoint MAY still have messages! check again in future -// self.get_state_mut().polled_undrained.insert(eport); -// return Ok(Some(ReceivedMsg { recipient: eport, msg })); -// } -// } - -// let state = self.get_state_mut(); -// match state.poll_events(deadline) { -// Ok(()) => { -// for e in state.events.iter() { -// state.polled_undrained.insert(PortId::from_token(e.token())); -// } -// } -// Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), -// Err(PollDeadlineErr::Timeout) => return Ok(None), -// } -// } -// } -// fn recv_blocking(&mut self) -> Result { -// // try get something buffered -// if let Some(x) = self.get_state_mut().undelayed.pop() { -// return Ok(x); -// } - -// loop { -// // polled_undrained may not be empty -// while let Some(eport) = self.get_state_mut().polled_undrained.pop() { -// if let Some(msg) = self -// .get_endpoint_mut(eport) -// .recv() -// .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? -// { -// // this endpoint MAY still have messages! check again in future -// self.get_state_mut().polled_undrained.insert(eport); -// return Ok(ReceivedMsg { recipient: eport, msg }); -// } -// } - -// let state = self.get_state_mut(); - -// state -// .poll -// .poll(&mut state.events, None) -// .map_err(|_| MessengerRecvErr::PollingFailed)?; -// for e in state.events.iter() { -// state.polled_undrained.insert(PortId::from_token(e.token())); -// } -// } -// } -// } - -// ///////////////////////////////// -// impl Debug for SolutionStorage { -// fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { -// f.pad("Solutions: [")?; -// for (subtree_id, &index) in self.subtree_id_to_index.iter() { -// let sols = &self.subtree_solutions[index]; -// f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; -// } -// f.pad("]") -// } -// } -// impl From for SyncErr { -// fn from(e: EvalErr) -> SyncErr { -// SyncErr::EvalErr(e) -// } -// } -// impl From for SyncErr { -// fn from(e: MessengerRecvErr) -> SyncErr { -// SyncErr::MessengerRecvErr(e) -// } -// } -// impl From for ConnectErr { -// fn from(e: MessengerRecvErr) -> ConnectErr { -// ConnectErr::MessengerRecvErr(e) -// } -// } -// impl Default for Arena { -// fn default() -> Self { -// Self { storage: vec![] } -// } -// } -// impl Arena { -// pub fn alloc(&mut self, t: T) -> PortId { -// self.storage.push(t); -// let l: u32 = self.storage.len().try_into().unwrap(); -// PortId::from_raw(l - 1u32) -// } -// pub fn get(&self, key: PortId) -> Option<&T> { -// self.storage.get(key.to_raw() as usize) -// } -// pub fn get_mut(&mut self, key: PortId) -> Option<&mut T> { -// self.storage.get_mut(key.to_raw() as usize) -// } -// pub fn type_convert(self, f: impl FnMut((PortId, T)) -> X) -> Arena { -// Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() } -// } -// pub fn iter(&self) -> impl Iterator { -// self.keyspace().zip(self.storage.iter()) -// } -// pub fn len(&self) -> usize { -// self.storage.len() -// } -// pub fn keyspace(&self) -> impl Iterator { -// (0u32..self.storage.len().try_into().unwrap()).map(PortId::from_raw) -// } -// } - -// impl ChannelIdStream { -// fn new(controller_id: ControllerId) -> Self { -// Self { controller_id, next_channel_index: 0 } -// } -// fn next(&mut self) -> ChannelId { -// self.next_channel_index += 1; -// ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 } -// } -// } - -// impl MessengerState { -// // does NOT guarantee that events is non-empty -// fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { -// use PollDeadlineErr::*; -// self.events.clear(); -// let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; -// self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; -// Ok(()) -// } -// } -// impl From for ConnectErr { -// fn from(e: PollDeadlineErr) -> ConnectErr { -// match e { -// PollDeadlineErr::Timeout => ConnectErr::Timeout, -// PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed, -// } -// } -// } - -// impl std::ops::Not for Polarity { -// type Output = Self; -// fn not(self) -> Self::Output { -// use Polarity::*; -// match self { -// Putter => Getter, -// Getter => Putter, -// } -// } -// } - -// impl Predicate { -// // returns true IFF self.unify would return Equivalent OR FormerNotLatter -// pub fn satisfies(&self, other: &Self) -> bool { -// let mut s_it = self.assigned.iter(); -// let mut s = if let Some(s) = s_it.next() { -// s -// } else { -// return other.assigned.is_empty(); -// }; -// for (oid, ob) in other.assigned.iter() { -// while s.0 < oid { -// s = if let Some(s) = s_it.next() { -// s -// } else { -// return false; -// }; -// } -// if s.0 > oid || s.1 != ob { -// return false; -// } -// } -// true -// } - -// /// Given self and other, two predicates, return the most general Predicate possible, N -// /// such that n.satisfies(self) && n.satisfies(other). -// /// If none exists Nonexistant is returned. -// /// If the resulting predicate is equivlanet to self, other, or both, -// /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. -// /// otherwise New(N) is returned. -// pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { -// use CommonSatResult::*; -// // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. -// let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; -// let [mut s, mut o] = [s_it.next(), o_it.next()]; -// // lists of assignments in self but not other and vice versa. -// let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; -// loop { -// match [s, o] { -// [None, None] => break, -// [None, Some(x)] => { -// o_not_s.push(x); -// o_not_s.extend(o_it); -// break; -// } -// [Some(x), None] => { -// s_not_o.push(x); -// s_not_o.extend(s_it); -// break; -// } -// [Some((sid, sb)), Some((oid, ob))] => { -// if sid < oid { -// // o is missing this element -// s_not_o.push((sid, sb)); -// s = s_it.next(); -// } else if sid > oid { -// // s is missing this element -// o_not_s.push((oid, ob)); -// o = o_it.next(); -// } else if sb != ob { -// assert_eq!(sid, oid); -// // both predicates assign the variable but differ on the value -// return Nonexistant; -// } else { -// // both predicates assign the variable to the same value -// s = s_it.next(); -// o = o_it.next(); -// } -// } -// } -// } -// // Observed zero inconsistencies. A unified predicate exists... -// match [s_not_o.is_empty(), o_not_s.is_empty()] { -// [true, true] => Equivalent, // ... equivalent to both. -// [false, true] => FormerNotLatter, // ... equivalent to self. -// [true, false] => LatterNotFormer, // ... equivalent to other. -// [false, false] => { -// // ... which is the union of the predicates' assignments but -// // is equivalent to neither self nor other. -// let mut new = self.clone(); -// for (&id, &b) in o_not_s { -// new.assigned.insert(id, b); -// } -// New(new) -// } -// } -// } - -// pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { -// self.assigned -// .iter() -// .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) -// } - -// pub fn batch_assign_nones( -// &mut self, -// channel_ids: impl Iterator, -// value: bool, -// ) { -// for channel_id in channel_ids { -// self.assigned.entry(channel_id).or_insert(value); -// } -// } -// pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option { -// self.assigned.insert(channel_id, value) -// } -// pub fn union_with(&self, other: &Self) -> Option { -// let mut res = self.clone(); -// for (&channel_id, &assignment_1) in other.assigned.iter() { -// match res.assigned.insert(channel_id, assignment_1) { -// Some(assignment_2) if assignment_1 != assignment_2 => return None, -// _ => {} -// } -// } -// Some(res) -// } -// pub fn query(&self, x: ChannelId) -> Option { -// self.assigned.get(&x).copied() -// } -// pub fn new_trivial() -> Self { -// Self { assigned: Default::default() } -// } -// } - -// #[test] -// fn pred_sat() { -// use maplit::btreemap; -// let mut c = ChannelIdStream::new(0); -// let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); -// let p = Predicate::new_trivial(); -// let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; -// let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; -// let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; -// let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; - -// assert!(p.satisfies(&p)); -// assert!(p_0t.satisfies(&p_0t)); -// assert!(p_0f.satisfies(&p_0f)); -// assert!(p_0f_3f.satisfies(&p_0f_3f)); -// assert!(p_0f_3t.satisfies(&p_0f_3t)); - -// assert!(p_0t.satisfies(&p)); -// assert!(p_0f.satisfies(&p)); -// assert!(p_0f_3f.satisfies(&p_0f)); -// assert!(p_0f_3t.satisfies(&p_0f)); - -// assert!(!p.satisfies(&p_0t)); -// assert!(!p.satisfies(&p_0f)); -// assert!(!p_0f.satisfies(&p_0t)); -// assert!(!p_0t.satisfies(&p_0f)); -// assert!(!p_0f_3f.satisfies(&p_0f_3t)); -// assert!(!p_0f_3t.satisfies(&p_0f_3f)); -// assert!(!p_0t.satisfies(&p_0f_3f)); -// assert!(!p_0f.satisfies(&p_0f_3f)); -// assert!(!p_0t.satisfies(&p_0f_3t)); -// assert!(!p_0f.satisfies(&p_0f_3t)); -// } - -// #[test] -// fn pred_common_sat() { -// use maplit::btreemap; -// use CommonSatResult::*; - -// let mut c = ChannelIdStream::new(0); -// let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); -// let p = Predicate::new_trivial(); -// let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; -// let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; -// let p_3f = Predicate { assigned: btreemap! { ch[3] => false } }; -// let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; -// let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; - -// assert_eq![p.common_satisfier(&p), Equivalent]; -// assert_eq![p_0t.common_satisfier(&p_0t), Equivalent]; +impl Predicate { + // returns true IFF self.unify would return Equivalent OR FormerNotLatter + pub fn satisfies(&self, other: &Self) -> bool { + let mut s_it = self.assigned.iter(); + let mut s = if let Some(s) = s_it.next() { + s + } else { + return other.assigned.is_empty(); + }; + for (oid, ob) in other.assigned.iter() { + while s.0 < oid { + s = if let Some(s) = s_it.next() { + s + } else { + return false; + }; + } + if s.0 > oid || s.1 != ob { + return false; + } + } + true + } -// assert_eq![p.common_satisfier(&p_0t), LatterNotFormer]; -// assert_eq![p_0t.common_satisfier(&p), FormerNotLatter]; + /// Given self and other, two predicates, return the most general Predicate possible, N + /// such that n.satisfies(self) && n.satisfies(other). + /// If none exists Nonexistant is returned. + /// If the resulting predicate is equivlanet to self, other, or both, + /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. + /// otherwise New(N) is returned. + pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { + use CommonSatResult::*; + // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. + let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; + let [mut s, mut o] = [s_it.next(), o_it.next()]; + // lists of assignments in self but not other and vice versa. + let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; + loop { + match [s, o] { + [None, None] => break, + [None, Some(x)] => { + o_not_s.push(x); + o_not_s.extend(o_it); + break; + } + [Some(x), None] => { + s_not_o.push(x); + s_not_o.extend(s_it); + break; + } + [Some((sid, sb)), Some((oid, ob))] => { + if sid < oid { + // o is missing this element + s_not_o.push((sid, sb)); + s = s_it.next(); + } else if sid > oid { + // s is missing this element + o_not_s.push((oid, ob)); + o = o_it.next(); + } else if sb != ob { + assert_eq!(sid, oid); + // both predicates assign the variable but differ on the value + return Nonexistant; + } else { + // both predicates assign the variable to the same value + s = s_it.next(); + o = o_it.next(); + } + } + } + } + // Observed zero inconsistencies. A unified predicate exists... + match [s_not_o.is_empty(), o_not_s.is_empty()] { + [true, true] => Equivalent, // ... equivalent to both. + [false, true] => FormerNotLatter, // ... equivalent to self. + [true, false] => LatterNotFormer, // ... equivalent to other. + [false, false] => { + // ... which is the union of the predicates' assignments but + // is equivalent to neither self nor other. + let mut new = self.clone(); + for (&id, &b) in o_not_s { + new.assigned.insert(id, b); + } + New(new) + } + } + } -// assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant]; -// assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant]; -// assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant]; -// assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant]; + pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { + self.assigned + .iter() + .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) + } -// assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)]; -// } + pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator, value: bool) { + for channel_id in channel_ids { + self.assigned.entry(channel_id).or_insert(value); + } + } + pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option { + self.assigned.insert(channel_id, value) + } + pub fn union_with(&self, other: &Self) -> Option { + let mut res = self.clone(); + for (&channel_id, &assignment_1) in other.assigned.iter() { + match res.assigned.insert(channel_id, assignment_1) { + Some(assignment_2) if assignment_1 != assignment_2 => return None, + _ => {} + } + } + Some(res) + } + pub fn query(&self, x: PortId) -> Option { + self.assigned.get(&x).copied() + } + pub fn new_trivial() -> Self { + Self { assigned: Default::default() } + } +} diff --git a/src/runtime/my_tests.rs b/src/runtime/my_tests.rs index 998243c8139cec175f1f91415ebdc6cff08e4975..40d099a473fe58eebb3950ed6259e7e4fd510fd2 100644 --- a/src/runtime/my_tests.rs +++ b/src/runtime/my_tests.rs @@ -45,9 +45,8 @@ fn add_sync() { fn add_net_port() { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); let sock_addr = next_test_addr(); - let _ = - c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap(); - let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap(); + let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); println!("{:#?}", c); } @@ -62,12 +61,12 @@ fn trivial_connect() { fn single_node_connect() { let sock_addr = next_test_addr(); let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let _ = - c.add_net_port(EndpointSetup { polarity: Getter, sock_addr, is_active: false }).unwrap(); - let _ = c.add_net_port(EndpointSetup { polarity: Putter, sock_addr, is_active: true }).unwrap(); - c.connect(Duration::from_secs(1)).unwrap(); + let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + let res = c.connect(Duration::from_secs(1)); println!("{:#?}", c); c.get_logger().dump_log(&mut std::io::stdout().lock()); + res.unwrap(); } #[test] @@ -76,15 +75,13 @@ fn multithreaded_connect() { scope(|s| { s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); - let es = EndpointSetup { polarity: Getter, sock_addr, is_active: true }; - let _ = c.add_net_port(es).unwrap(); + let _ = c.add_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); c.connect(Duration::from_secs(1)).unwrap(); c.print_state(); }); s.spawn(|_| { let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); - let es = EndpointSetup { polarity: Putter, sock_addr, is_active: false }; - let _ = c.add_net_port(es).unwrap(); + let _ = c.add_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); c.connect(Duration::from_secs(1)).unwrap(); c.print_state(); }); diff --git a/src/runtime/actors.rs b/src/runtime/retired/actors.rs similarity index 100% rename from src/runtime/actors.rs rename to src/runtime/retired/actors.rs diff --git a/src/runtime/retired/communication.rs b/src/runtime/retired/communication.rs new file mode 100644 index 0000000000000000000000000000000000000000..e619b0712bbc97002d4b7cb762fd5e264fafdb49 --- /dev/null +++ b/src/runtime/retired/communication.rs @@ -0,0 +1,739 @@ +use crate::common::*; +use crate::runtime::{actors::*, endpoint::*, errors::*, *}; + +impl Controller { + fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> { + log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); + let ret = match &decision { + Decision::Success(predicate) => { + // overwrite MonoN/P + self.inner.mono_n = { + let poly_n = self.ephemeral.poly_n.take().unwrap(); + poly_n.choose_mono(predicate).unwrap_or_else(|| { + panic!( + "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", + &predicate, &poly_n.branches, &self.inner.logger + ); + }) + }; + self.inner.mono_ps.clear(); + self.inner.mono_ps.extend( + self.ephemeral + .poly_ps + .drain(..) + .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), + ); + Ok(()) + } + Decision::Failure => Err(SyncErr::Timeout), + }; + let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); + for &child_port in self.inner.family.children_ports.iter() { + log!( + &mut self.inner.logger, + "Forwarding {:?} to child with port {:?}", + &announcement, + child_port + ); + self.inner + .endpoint_exts + .get_mut(child_port) + .expect("eefef") + .endpoint + .send(announcement.clone())?; + } + self.inner.round_index += 1; + self.ephemeral.clear(); + ret + } + + // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found + fn handle_locals_maybe_decide(&mut self) -> Result { + if let Some(parent_port) = self.inner.family.parent_port { + // I have a parent -> I'm not the leader + let parent_endpoint = + &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; + for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { + let msg = + CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); + log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); + parent_endpoint.send(msg)?; + } + Ok(false) + } else { + // I have no parent -> I'm the leader + assert!(self.inner.family.parent_port.is_none()); + let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); + Ok(if let Some(predicate) = maybe_predicate { + let decision = Decision::Success(predicate); + log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); + self.end_round_with_decision(decision)?; + true + } else { + false + }) + } + } + + fn kick_off_native( + &mut self, + sync_batches: impl Iterator, + ) -> Result { + let MonoN { ports, .. } = self.inner.mono_n.clone(); + let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; + let mut branches = HashMap::<_, _>::default(); + for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { + let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; + let all_ports = ports.iter().copied(); + let all_channel_ids = all_ports.map(port_to_channel_id); + + let mut predicate = Predicate::new_trivial(); + + // assign TRUE for puts and gets + let true_ports = puts.keys().chain(gets.iter()).copied(); + let true_channel_ids = true_ports.clone().map(port_to_channel_id); + predicate.batch_assign_nones(true_channel_ids, true); + + // assign FALSE for all in interface not assigned true + predicate.batch_assign_nones(all_channel_ids.clone(), false); + + if branches.contains_key(&predicate) { + // TODO what do I do with redundant predicates? + unimplemented!( + "Duplicate predicate {:#?}!\nHaving multiple batches with the same + predicate requires the support of oracle boolean variables", + &predicate, + ) + } + let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; + for (port, payload) in puts { + log!( + &mut self.inner.logger, + "... ... Initial native put msg {:?} pred {:?} batch {:?}", + &payload, + &predicate, + sync_batch_index, + ); + let msg = + CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } + .into_msg(*round_index); + endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; + } + log!( + &mut self.inner.logger, + "... Initial native branch batch index={} with pred {:?}", + sync_batch_index, + &predicate + ); + if branch.to_get.is_empty() { + self.ephemeral.solution_storage.submit_and_digest_subtree_solution( + &mut self.inner.logger, + SubtreeId::PolyN, + predicate.clone(), + ); + } + branches.insert(predicate, branch); + } + Ok(PolyN { ports, branches }) + } + pub fn sync_round( + &mut self, + deadline: Option, + sync_batches: Option>, + ) -> Result<(), SyncErr> { + if let Some(e) = self.unrecoverable_error { + return Err(e.clone()); + } + self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { + SyncErr::Timeout => e, // this isn't unrecoverable + _ => { + // Must set unrecoverable error! and tear down our net channels + self.unrecoverable_error = Some(e); + self.ephemeral.clear(); + self.inner.endpoint_exts = Default::default(); + e + } + }) + } + + // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. + // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. + fn sync_round_inner( + &mut self, + mut deadline: Option, + sync_batches: Option>, + ) -> Result<(), SyncErr> { + log!( + &mut self.inner.logger, + "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", + self.inner.round_index + ); + assert!(self.ephemeral.is_clear()); + assert!(self.unrecoverable_error.is_none()); + + // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). + // Some actors are dropped. some new actors are created. + // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors + self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); + log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); + while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { + let mut m_ctx = MonoPContext { + ports: &mut mono_p.ports, + mono_ps: &mut self.ephemeral.mono_ps, + inner: &mut self.inner, + }; + // cross boundary into crate::protocol + let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); + log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); + match blocker { + MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent), + MonoBlocker::ComponentExit => drop(mono_p), + MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), + } + } + log!( + &mut self.inner.logger, + "Finished running all MonoPs! Have {} PolyPs waiting", + self.ephemeral.poly_ps.len() + ); + + // 3. define the mapping from port -> actor + // this is needed during the event loop to determine which actor + // should receive the incoming message. + // TODO: store and update this mapping rather than rebuilding it each round. + let port_to_holder: HashMap = { + use PolyId::*; + let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); + let p = self + .ephemeral + .poly_ps + .iter() + .enumerate() + .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); + n.chain(p).collect() + }; + log!( + &mut self.inner.logger, + "SET OF PolyPs and MonoPs final! port lookup map is {:?}", + &port_to_holder + ); + + // 4. Create the solution storage. it tracks the solutions of "subtrees" + // of the controller in the overlay tree. + self.ephemeral.solution_storage.reset({ + let n = std::iter::once(SubtreeId::PolyN); + let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); + let c = self + .inner + .family + .children_ports + .iter() + .map(|&port| SubtreeId::ChildController { port }); + let subtree_id_iter = n.chain(m).chain(c); + log!( + &mut self.inner.logger, + "Solution Storage has subtree Ids: {:?}", + &subtree_id_iter.clone().collect::>() + ); + subtree_id_iter + }); + + // 5. kick off the synchronous round of the native actor if it exists + + log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); + self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { + // using if let because of nested ? operator + // TODO check that there are 1+ branches or NO SOLUTION + let poly_n = self.kick_off_native(sync_batches)?; + log!( + &mut self.inner.logger, + "PolyN kicked off, and has branches with predicates... {:?}", + poly_n.branches.keys().collect::>() + ); + Some(poly_n) + } else { + log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); + None + }; + + // 6. Kick off the synchronous round of each protocol actor + // If just one actor becomes inconsistent now, there can be no solution! + // TODO distinguish between completed and not completed poly_p's? + log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); + for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { + let my_subtree_id = SubtreeId::PolyP { index }; + let m_ctx = PolyPContext { + my_subtree_id, + inner: &mut self.inner, + solution_storage: &mut self.ephemeral.solution_storage, + }; + use SyncRunResult as Srr; + let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; + log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); + match blocker { + Srr::NoBranches => return Err(SyncErr::Inconsistent), + Srr::AllBranchesComplete | Srr::BlockingForRecv => (), + } + } + log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); + + // 7. `solution_storage` may have new solutions for this controller + // handle their discovery. LEADER => announce, otherwise => send to parent + { + let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); + log!( + &mut self.inner.logger, + "Got {} controller-local solutions before a single RECV: {:?}", + peeked.len(), + peeked + ); + } + if self.handle_locals_maybe_decide()? { + return Ok(()); + } + + // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error + log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); + self.undelay_all(); + 'recv_loop: loop { + log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); + let received = match deadline { + None => { + // we have personally timed out. perform a "long" poll. + self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") + } + Some(d) => match self.recv(d)? { + // we have not yet timed out. performed a time-limited poll + Some(received) => received, + None => { + // timed out! send a FAILURE message to the sink, + // and henceforth don't time out on polling. + deadline = None; + match self.inner.family.parent_port { + None => { + // I am the sink! announce failure and return. + return self.end_round_with_decision(Decision::Failure); + } + Some(parent_port) => { + // I am not the sink! send a failure message. + let announcement = Msg::CommMsg(CommMsg { + round_index: self.inner.round_index, + contents: CommMsgContents::Failure, + }); + log!( + &mut self.inner.logger, + "Forwarding {:?} to parent with port {:?}", + &announcement, + parent_port + ); + self.inner + .endpoint_exts + .get_mut(parent_port) + .expect("ss") + .endpoint + .send(announcement.clone())?; + continue; // poll some more + } + } + } + }, + }; + log!(&mut self.inner.logger, "::: message {:?}...", &received); + let current_content = match received.msg { + Msg::SetupMsg(s) => { + // This occurs in the event the connector was malformed during connect() + println!("WASNT EXPECTING {:?}", s); + return Err(SyncErr::UnexpectedSetupMsg); + } + Msg::CommMsg(CommMsg { round_index, .. }) + if round_index < self.inner.round_index => + { + // Old message! Can safely discard + log!(&mut self.inner.logger, "...and its OLD! :("); + drop(received); + continue 'recv_loop; + } + Msg::CommMsg(CommMsg { round_index, .. }) + if round_index > self.inner.round_index => + { + // Message from a next round. Keep for later! + log!(&mut self.inner.logger, "... DELAY! :("); + self.delay(received); + continue 'recv_loop; + } + Msg::CommMsg(CommMsg { contents, round_index }) => { + log!( + &mut self.inner.logger, + "... its a round-appropriate CommMsg with port {:?}", + received.recipient + ); + assert_eq!(round_index, self.inner.round_index); + contents + } + }; + match current_content { + CommMsgContents::Failure => match self.inner.family.parent_port { + Some(parent_port) => { + let announcement = Msg::CommMsg(CommMsg { + round_index: self.inner.round_index, + contents: CommMsgContents::Failure, + }); + log!( + &mut self.inner.logger, + "Forwarding {:?} to parent with port {:?}", + &announcement, + parent_port + ); + self.inner + .endpoint_exts + .get_mut(parent_port) + .expect("ss") + .endpoint + .send(announcement.clone())?; + } + None => return self.end_round_with_decision(Decision::Failure), + }, + CommMsgContents::Elaborate { partial_oracle } => { + // Child controller submitted a subtree solution. + if !self.inner.family.children_ports.contains(&received.recipient) { + return Err(SyncErr::ElaborateFromNonChild); + } + let subtree_id = SubtreeId::ChildController { port: received.recipient }; + log!( + &mut self.inner.logger, + "Received elaboration from child for subtree {:?}: {:?}", + subtree_id, + &partial_oracle + ); + self.ephemeral.solution_storage.submit_and_digest_subtree_solution( + &mut self.inner.logger, + subtree_id, + partial_oracle, + ); + if self.handle_locals_maybe_decide()? { + return Ok(()); + } + } + CommMsgContents::Announce { decision } => { + if self.inner.family.parent_port != Some(received.recipient) { + return Err(SyncErr::AnnounceFromNonParent); + } + log!( + &mut self.inner.logger, + "Received ANNOUNCEMENT from from parent {:?}: {:?}", + received.recipient, + &decision + ); + return self.end_round_with_decision(decision); + } + CommMsgContents::SendPayload { payload_predicate, payload } => { + // check that we expect to be able to receive payloads from this sender + assert_eq!( + Getter, + self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity + ); + + // message for some actor. Feed it to the appropriate actor + // and then give them another chance to run. + let subtree_id = port_to_holder.get(&received.recipient); + log!( + &mut self.inner.logger, + "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", + subtree_id, + &payload_predicate, + &payload + ); + let channel_id = self + .inner + .endpoint_exts + .get(received.recipient) + .expect("UEHFU") + .info + .channel_id; + if payload_predicate.query(channel_id) != Some(true) { + // sender didn't preserve the invariant + return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id)); + } + match subtree_id { + None => { + // this happens when a message is sent to a component that has exited. + // It's safe to drop this message; + // The sender branch will certainly not be part of the solution + } + Some(PolyId::N) => { + // Message for NativeMachine + self.ephemeral.poly_n.as_mut().unwrap().sync_recv( + received.recipient, + &mut self.inner.logger, + payload, + payload_predicate, + &mut self.ephemeral.solution_storage, + ); + if self.handle_locals_maybe_decide()? { + return Ok(()); + } + } + Some(PolyId::P { index }) => { + // Message for protocol actor + let poly_p = &mut self.ephemeral.poly_ps[*index]; + + let m_ctx = PolyPContext { + my_subtree_id: SubtreeId::PolyP { index: *index }, + inner: &mut self.inner, + solution_storage: &mut self.ephemeral.solution_storage, + }; + use SyncRunResult as Srr; + let blocker = poly_p.poly_recv_run( + m_ctx, + &self.protocol_description, + received.recipient, + payload_predicate, + payload, + )?; + log!( + &mut self.inner.logger, + "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", + subtree_id, + blocker + ); + match blocker { + Srr::NoBranches => return Err(SyncErr::Inconsistent), + Srr::BlockingForRecv | Srr::AllBranchesComplete => { + { + let peeked = self + .ephemeral + .solution_storage + .peek_new_locals() + .collect::>(); + log!( + &mut self.inner.logger, + "Got {} new controller-local solutions from RECV: {:?}", + peeked.len(), + peeked + ); + } + if self.handle_locals_maybe_decide()? { + return Ok(()); + } + } + } + } + }; + } + } + } + } +} +impl ControllerEphemeral { + fn is_clear(&self) -> bool { + self.solution_storage.is_clear() + && self.poly_n.is_none() + && self.poly_ps.is_empty() + && self.mono_ps.is_empty() + && self.port_to_holder.is_empty() + } + fn clear(&mut self) { + self.solution_storage.clear(); + self.poly_n.take(); + self.poly_ps.clear(); + self.port_to_holder.clear(); + } +} +impl Into for MonoP { + fn into(self) -> PolyP { + PolyP { + complete: Default::default(), + incomplete: hashmap! { + Predicate::new_trivial() => + BranchP { + state: self.state, + inbox: Default::default(), + outbox: Default::default(), + blocking_on: None, + } + }, + ports: self.ports, + } + } +} + +impl From for SyncErr { + fn from(e: EndpointErr) -> SyncErr { + SyncErr::EndpointErr(e) + } +} + +impl MonoContext for MonoPContext<'_> { + type D = ProtocolD; + type S = ProtocolS; + fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { + log!( + &mut self.inner.logger, + "!! MonoContext callback to new_component with ports {:?}!", + &moved_ports, + ); + if moved_ports.is_subset(self.ports) { + self.ports.retain(|x| !moved_ports.contains(x)); + self.mono_ps.push(MonoP { state: init_state, ports: moved_ports }); + } else { + panic!("MachineP attempting to move alien port!"); + } + } + fn new_channel(&mut self) -> [PortId; 2] { + let [a, b] = Endpoint::new_memory_pair(); + let channel_id = self.inner.channel_id_stream.next(); + + let mut clos = |endpoint, polarity| { + let endpoint_ext = + EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint }; + let port = self.inner.endpoint_exts.alloc(endpoint_ext); + let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint; + let token = PortId::to_token(port); + self.inner + .messenger_state + .poll + .register(endpoint, token, Ready::readable(), PollOpt::edge()) + .expect("AAGAGGGGG"); + self.ports.insert(port); + port + }; + let [kp, kg] = [clos(a, Putter), clos(b, Getter)]; + log!( + &mut self.inner.logger, + "!! MonoContext callback to new_channel. returning ports {:?}!", + [kp, kg], + ); + [kp, kg] + } + fn new_random(&mut self) -> u64 { + type Bytes8 = [u8; std::mem::size_of::()]; + let mut bytes = Bytes8::default(); + getrandom::getrandom(&mut bytes).unwrap(); + let val = unsafe { std::mem::transmute::(bytes) }; + log!( + &mut self.inner.logger, + "!! MonoContext callback to new_random. returning val {:?}!", + val, + ); + val + } +} + +impl SolutionStorage { + fn is_clear(&self) -> bool { + self.subtree_id_to_index.is_empty() + && self.subtree_solutions.is_empty() + && self.old_local.is_empty() + && self.new_local.is_empty() + } + fn clear(&mut self) { + self.subtree_id_to_index.clear(); + self.subtree_solutions.clear(); + self.old_local.clear(); + self.new_local.clear(); + } + pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { + self.subtree_id_to_index.clear(); + self.subtree_solutions.clear(); + self.old_local.clear(); + self.new_local.clear(); + for key in subtree_ids { + self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); + self.subtree_solutions.push(Default::default()) + } + } + + pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { + self.new_local.iter() + } + + pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { + let Self { old_local, new_local, .. } = self; + new_local.drain().map(move |local| { + old_local.insert(local.clone()); + local + }) + } + + pub(crate) fn submit_and_digest_subtree_solution( + &mut self, + logger: &mut String, + subtree_id: SubtreeId, + predicate: Predicate, + ) { + log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); + let index = self.subtree_id_to_index[&subtree_id]; + let left = 0..index; + let right = (index + 1)..self.subtree_solutions.len(); + + let Self { subtree_solutions, new_local, old_local, .. } = self; + let was_new = subtree_solutions[index].insert(predicate.clone()); + if was_new { + let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); + Self::elaborate_into_new_local_rec( + logger, + predicate, + set_visitor, + old_local, + new_local, + ); + } + } + + fn elaborate_into_new_local_rec<'a, 'b>( + logger: &mut String, + partial: Predicate, + mut set_visitor: impl Iterator> + Clone, + old_local: &'b HashSet, + new_local: &'a mut HashSet, + ) { + if let Some(set) = set_visitor.next() { + // incomplete solution. keep traversing + for pred in set.iter() { + if let Some(elaborated) = pred.union_with(&partial) { + Self::elaborate_into_new_local_rec( + logger, + elaborated, + set_visitor.clone(), + old_local, + new_local, + ) + } + } + } else { + // recursive stop condition. `partial` is a local subtree solution + if !old_local.contains(&partial) { + // ... and it hasn't been found before + log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); + new_local.insert(partial); + } + } + } +} +impl PolyContext for BranchPContext<'_, '_> { + type D = ProtocolD; + + fn is_firing(&mut self, port: PortId) -> Option { + assert!(self.ports.contains(&port)); + let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; + let val = self.predicate.query(channel_id); + log!( + &mut self.m_ctx.inner.logger, + "!! PolyContext callback to is_firing by {:?}! returning {:?}", + self.m_ctx.my_subtree_id, + val, + ); + val + } + fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + assert!(self.ports.contains(&port)); + let val = self.inbox.get(&port); + log!( + &mut self.m_ctx.inner.logger, + "!! PolyContext callback to read_msg by {:?}! returning {:?}", + self.m_ctx.my_subtree_id, + val, + ); + val + } +} diff --git a/src/runtime/connector.rs b/src/runtime/retired/connector.rs similarity index 100% rename from src/runtime/connector.rs rename to src/runtime/retired/connector.rs diff --git a/src/runtime/endpoint.rs b/src/runtime/retired/endpoint.rs similarity index 100% rename from src/runtime/endpoint.rs rename to src/runtime/retired/endpoint.rs diff --git a/src/runtime/errors.rs b/src/runtime/retired/errors.rs similarity index 100% rename from src/runtime/errors.rs rename to src/runtime/retired/errors.rs diff --git a/src/runtime/experimental/api.rs b/src/runtime/retired/experimental/api.rs similarity index 100% rename from src/runtime/experimental/api.rs rename to src/runtime/retired/experimental/api.rs diff --git a/src/runtime/experimental/bits.rs b/src/runtime/retired/experimental/bits.rs similarity index 100% rename from src/runtime/experimental/bits.rs rename to src/runtime/retired/experimental/bits.rs diff --git a/src/runtime/experimental/ecs.rs b/src/runtime/retired/experimental/ecs.rs similarity index 100% rename from src/runtime/experimental/ecs.rs rename to src/runtime/retired/experimental/ecs.rs diff --git a/src/runtime/experimental/mod.rs b/src/runtime/retired/experimental/mod.rs similarity index 100% rename from src/runtime/experimental/mod.rs rename to src/runtime/retired/experimental/mod.rs diff --git a/src/runtime/experimental/pdl.rs b/src/runtime/retired/experimental/pdl.rs similarity index 100% rename from src/runtime/experimental/pdl.rs rename to src/runtime/retired/experimental/pdl.rs diff --git a/src/runtime/experimental/predicate.rs b/src/runtime/retired/experimental/predicate.rs similarity index 100% rename from src/runtime/experimental/predicate.rs rename to src/runtime/retired/experimental/predicate.rs diff --git a/src/runtime/experimental/vec_storage.rs b/src/runtime/retired/experimental/vec_storage.rs similarity index 100% rename from src/runtime/experimental/vec_storage.rs rename to src/runtime/retired/experimental/vec_storage.rs diff --git a/src/runtime/ffi.rs b/src/runtime/retired/ffi.rs similarity index 100% rename from src/runtime/ffi.rs rename to src/runtime/retired/ffi.rs diff --git a/src/runtime/serde.rs b/src/runtime/retired/serde.rs similarity index 100% rename from src/runtime/serde.rs rename to src/runtime/retired/serde.rs diff --git a/src/runtime/setup.rs b/src/runtime/retired/setup.rs similarity index 100% rename from src/runtime/setup.rs rename to src/runtime/retired/setup.rs diff --git a/src/runtime/v2.rs b/src/runtime/retired/v2.rs similarity index 100% rename from src/runtime/v2.rs rename to src/runtime/retired/v2.rs diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index bc747db7960b1b734461fd8007b59628e2468f98..bd6281a1ae2c776f19a4859f0958fbee6265004b 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -29,29 +29,37 @@ impl Connector { id_manager: IdManager::new(controller_id), native_ports: Default::default(), proto_components: Default::default(), - outp_to_inp: Default::default(), - inp_to_route: Default::default(), + port_info: Default::default(), phased: ConnectorPhased::Setup { endpoint_setups: Default::default(), surplus_sockets }, } } pub fn add_port_pair(&mut self) -> [PortId; 2] { - let o = self.id_manager.next_port(); - let i = self.id_manager.next_port(); - self.outp_to_inp.insert(o, i); - self.inp_to_route.insert(i, InpRoute::NativeComponent); + let route = Route::LocalComponent(LocalComponentId::Native); + let [o, i] = [self.id_manager.next_port(), self.id_manager.next_port()]; self.native_ports.insert(o); self.native_ports.insert(i); + // {polarity, peer, route} known. {} unknown. + self.port_info.polarities.insert(o, Putter); + self.port_info.polarities.insert(i, Getter); + self.port_info.peers.insert(o, i); + self.port_info.peers.insert(i, o); + self.port_info.routes.insert(o, route); + self.port_info.routes.insert(i, route); log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } - pub fn add_net_port(&mut self, endpoint_setup: EndpointSetup) -> Result { + pub fn add_net_port( + &mut self, + polarity: Polarity, + endpoint_setup: EndpointSetup, + ) -> Result { match &mut self.phased { ConnectorPhased::Setup { endpoint_setups, .. } => { let p = self.id_manager.next_port(); self.native_ports.insert(p); - if endpoint_setup.polarity == Getter { - self.inp_to_route.insert(p, InpRoute::NativeComponent); - } + // {polarity, route} known. {peer} unknown. + self.port_info.polarities.insert(p, polarity); + self.port_info.routes.insert(p, Route::LocalComponent(LocalComponentId::Native)); log!(self.logger, "Added net port {:?} with info {:?} ", p, &endpoint_setup); endpoint_setups.push((p, endpoint_setup)); Ok(p) @@ -59,23 +67,6 @@ impl Connector { ConnectorPhased::Communication { .. } => Err(()), } } - fn check_polarity(&self, port: &PortId) -> Polarity { - if let ConnectorPhased::Setup { endpoint_setups, .. } = &self.phased { - for (setup_port, EndpointSetup { polarity, .. }) in endpoint_setups.iter() { - if setup_port == port { - // special case. this port's polarity isn't reflected by - // self.inp_to_route or self.outp_to_inp, because its still not paired to a peer - return *polarity; - } - } - } - if self.outp_to_inp.contains_key(port) { - Polarity::Putter - } else { - assert!(self.inp_to_route.contains_key(port)); - Polarity::Getter - } - } pub fn add_component( &mut self, identifier: &[u8], @@ -90,7 +81,7 @@ impl Connector { if !self.native_ports.contains(port) { return Err(UnknownPort(*port)); } - if expected_polarity != self.check_polarity(port) { + if expected_polarity != *self.port_info.polarities.get(port).unwrap() { return Err(WrongPortPolarity { port: *port, expected_polarity }); } } @@ -100,9 +91,11 @@ impl Connector { let proto_component_index = self.proto_components.len(); self.proto_components.push(proto_component); for port in ports.iter() { - if let Polarity::Getter = self.check_polarity(port) { - self.inp_to_route - .insert(*port, InpRoute::ProtoComponent { index: proto_component_index }); + if let Polarity::Getter = *self.port_info.polarities.get(port).unwrap() { + self.port_info.routes.insert( + *port, + Route::LocalComponent(LocalComponentId::Proto { index: proto_component_index }), + ); } } Ok(()) @@ -118,22 +111,8 @@ impl Connector { let deadline = Instant::now() + timeout; // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = { - let Self { outp_to_inp, inp_to_route, logger, .. } = self; - let logical_channel_callback = |lci: LogicalChannelInfo| { - if let Putter = lci.local_polarity { - outp_to_inp.insert(lci.local_port, lci.peer_port); - inp_to_route.insert( - lci.peer_port, - InpRoute::Endpoint { index: lci.endpoint_index }, - ); - } - }; - new_endpoint_manager( - &mut **logger, - endpoint_setups, - logical_channel_callback, - deadline, - )? + let Self { logger, port_info, .. } = self; + new_endpoint_manager(&mut **logger, endpoint_setups, port_info, deadline)? }; log!( self.logger, @@ -153,6 +132,10 @@ impl Connector { endpoint_manager, neighborhood, mem_inbox: Default::default(), + native_actor: NativeActor::Nonsync { + sync_result_branch: None, + next_batches: vec![SyncBatch::default()], + }, }; Ok(()) } @@ -163,14 +146,13 @@ impl Connector { fn new_endpoint_manager( logger: &mut dyn Logger, endpoint_setups: &[(PortId, EndpointSetup)], - mut logical_channel_callback: impl FnMut(LogicalChannelInfo), + port_info: &mut PortInfo, deadline: Instant, ) -> Result { //////////////////////////////////////////// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, - endpoint_setup: EndpointSetup, local_port: PortId, sent_local_port: bool, // true <-> I've sent my local port recv_peer_port: Option, // Some(..) <-> I've received my peer's port @@ -194,20 +176,15 @@ fn new_endpoint_manager( poll.registry().register(&mut listener, token, BOTH).unwrap(); TodoEndpoint::Listener(listener) }; - Ok(Todo { - todo_endpoint, - endpoint_setup: endpoint_setup.clone(), - local_port, - sent_local_port: false, - recv_peer_port: None, - }) + Ok(Todo { todo_endpoint, local_port, sent_local_port: false, recv_peer_port: None }) }; //////////////////////////////////////////// // 1. Start to construct EndpointManager let mut poll = Poll::new().map_err(drop)?; let mut events = Events::with_capacity(64); - let mut undrained_endpoints = IndexSet::::default(); + let mut polled_undrained = IndexSet::::default(); + let mut delayed_messages = vec![]; // 2. create a registered (TcpListener/Endpoint) for passive / active respectively let mut todos = endpoint_setups @@ -234,7 +211,7 @@ fn new_endpoint_manager( let (mut stream, peer_addr) = listener.accept().map_err(drop)?; poll.registry().deregister(listener).unwrap(); poll.registry().register(&mut stream, token, BOTH).unwrap(); - log!(logger, "Endpoint({}) accepted a connection from {:?}", index, peer_addr); + log!(logger, "Endpoint[{}] accepted a connection from {:?}", index, peer_addr); let endpoint = Endpoint { stream, inbox: vec![] }; todo.todo_endpoint = TodoEndpoint::Endpoint(endpoint); } @@ -242,35 +219,49 @@ fn new_endpoint_manager( Todo { todo_endpoint: TodoEndpoint::Endpoint(endpoint), local_port, - endpoint_setup, sent_local_port, recv_peer_port, + .. } => { if !setup_incomplete.contains(&index) { continue; } + let local_polarity = *port_info.polarities.get(local_port).unwrap(); if event.is_writable() && !*sent_local_port { - let msg = - MyPortInfo { polarity: endpoint_setup.polarity, port: *local_port }; + let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { + polarity: local_polarity, + port: *local_port, + })); endpoint.send(&msg)?; - log!(logger, "endpoint[{}] sent peer info {:?}", index, &msg); + log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); *sent_local_port = true; } if event.is_readable() && recv_peer_port.is_none() { - undrained_endpoints.insert(index); - if let Some(peer_port_info) = - endpoint.try_recv::().map_err(drop)? - { - log!(logger, "endpoint[{}] got peer info {:?}", index, peer_port_info); - assert!(peer_port_info.polarity != endpoint_setup.polarity); - *recv_peer_port = Some(peer_port_info.port); - let lci = LogicalChannelInfo { - local_port: *local_port, - peer_port: peer_port_info.port, - local_polarity: endpoint_setup.polarity, - endpoint_index: index, - }; - logical_channel_callback(lci); + let maybe_msg = endpoint.try_recv().map_err(drop)?; + if maybe_msg.is_some() && !endpoint.inbox.is_empty() { + polled_undrained.insert(index); + } + match maybe_msg { + None => {} // msg deserialization incomplete + Some(Msg::SetupMsg(SetupMsg::MyPortInfo(peer_info))) => { + log!(logger, "endpoint[{}] got peer info {:?}", index, peer_info); + assert!(peer_info.polarity != local_polarity); + *recv_peer_port = Some(peer_info.port); + // 1. finally learned the peer of this port! + port_info.peers.insert(*local_port, peer_info.port); + // 2. learned the info of this peer port + port_info.polarities.insert(peer_info.port, peer_info.polarity); + port_info.peers.insert(peer_info.port, *local_port); + port_info.routes.insert(peer_info.port, Route::Endpoint { index }); + } + Some(inappropriate_msg) => { + log!( + logger, + "delaying msg {:?} during channel setup phase", + inappropriate_msg + ); + delayed_messages.push((index, inappropriate_msg)); + } } } if *sent_local_port && recv_peer_port.is_some() { @@ -296,9 +287,9 @@ fn new_endpoint_manager( Ok(EndpointManager { poll, events, - undrained_endpoints, + polled_undrained, + undelayed_messages: delayed_messages, // no longer delayed delayed_messages: Default::default(), - undelayed_messages: Default::default(), endpoint_exts, }) } @@ -309,11 +300,7 @@ fn init_neighborhood( em: &mut EndpointManager, deadline: Instant, ) -> Result { - //////////////////////////////////////////// - use Msg::SetupMsg as S; - use SetupMsg::*; - //////////////////////////////////////////// - + use {Msg::SetupMsg as S, SetupMsg::*}; log!(logger, "beginning neighborhood construction"); // 1. broadcast my ID as the first echo. await reply from all neighbors let echo = S(LeaderEcho { maybe_leader: controller_id }); @@ -330,7 +317,7 @@ fn init_neighborhood( let mut my_leader = controller_id; em.undelay_all(); 'echo_loop: while !awaiting.is_empty() || parent.is_some() { - let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?; log!(logger, "GOT from index {:?} msg {:?}", &index, &msg); match msg { S(LeaderAnnounce { leader }) => { @@ -381,7 +368,10 @@ fn init_neighborhood( } } } - inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)), + inappropriate_msg => { + log!(logger, "delaying msg {:?} during echo phase", inappropriate_msg); + em.delayed_messages.push((index, inappropriate_msg)) + } } } match parent { @@ -413,21 +403,27 @@ fn init_neighborhood( ee.endpoint.send(msg)?; } let mut children = Vec::default(); + log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages); em.undelay_all(); + log!(logger, "delayed {:?} undelayed {:?}", &em.delayed_messages, &em.undelayed_messages); while !awaiting.is_empty() { - let (index, msg) = em.try_recv_any(deadline).map_err(drop)?; + log!(logger, "awaiting {:?}", &awaiting); + let (index, msg) = em.try_recv_any(logger, deadline).map_err(drop)?; match msg { S(YouAreMyParent) => { assert!(awaiting.remove(&index)); children.push(index); } - S(SetupMsg::LeaderAnnounce { leader }) => { + S(LeaderAnnounce { leader }) => { assert!(awaiting.remove(&index)); assert!(leader == my_leader); assert!(Some(index) != parent); // they wouldn't send me this if they considered me their parent } - inappropriate_msg => em.delayed_messages.push((index, inappropriate_msg)), + inappropriate_msg => { + log!(logger, "delaying msg {:?} during echo-reply phase", inappropriate_msg); + em.delayed_messages.push((index, inappropriate_msg)); + } } } children.sort();