diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index e619b0712bbc97002d4b7cb762fd5e264fafdb49..4ddefc9f54cef6af8c49f37b6db0e5cd6cf5eb4c 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -1,739 +1,736 @@ +use super::*; use crate::common::*; -use crate::runtime::{actors::*, endpoint::*, errors::*, *}; - -impl Controller { - fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> { - log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); - let ret = match &decision { - Decision::Success(predicate) => { - // overwrite MonoN/P - self.inner.mono_n = { - let poly_n = self.ephemeral.poly_n.take().unwrap(); - poly_n.choose_mono(predicate).unwrap_or_else(|| { - panic!( - "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", - &predicate, &poly_n.branches, &self.inner.logger - ); - }) - }; - self.inner.mono_ps.clear(); - self.inner.mono_ps.extend( - self.ephemeral - .poly_ps - .drain(..) - .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), - ); - Ok(()) - } - Decision::Failure => Err(SyncErr::Timeout), - }; - let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); - for &child_port in self.inner.family.children_ports.iter() { - log!( - &mut self.inner.logger, - "Forwarding {:?} to child with port {:?}", - &announcement, - child_port - ); - self.inner - .endpoint_exts - .get_mut(child_port) - .expect("eefef") - .endpoint - .send(announcement.clone())?; - } - self.inner.round_index += 1; - self.ephemeral.clear(); - ret +impl NonsyncProtoContext<'_> { + pub fn new_component(&mut self, moved_ports: HashSet, init_state: ComponentState) { + let component = &mut self.connector.proto_components[self.proto_component_index]; + assert!(component.ports.is_subset(&moved_ports)); + // let polarities = self.proto_description.component_polarities(identifier).expect("BAD "); + // if polarities.len() != ports.len() { + // return Err(WrongNumberOfParamaters { expected: polarities.len() }); + // } + // for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { + // if !self.native_ports.contains(port) { + // return Err(UnknownPort(*port)); + // } + // if expected_polarity != self.check_polarity(port) { + // return Err(WrongPortPolarity { port: *port, expected_polarity }); + // } + // } + // // ok! + // let state = self.proto_description.new_main_component(identifier, ports); + // let proto_component = ProtoComponent { ports: ports.iter().copied().collect(), state }; + // let proto_component_index = self.proto_components.len(); + // self.proto_components.push(proto_component); + // for port in ports.iter() { + // if let Polarity::Getter = self.check_polarity(port) { + // self.inp_to_route.insert( + // *port, + // InpRoute::LocalComponent(LocalComponentId::Proto { + // index: proto_component_index, + // }), + // ); + // } + // } } - - // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found - fn handle_locals_maybe_decide(&mut self) -> Result { - if let Some(parent_port) = self.inner.family.parent_port { - // I have a parent -> I'm not the leader - let parent_endpoint = - &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; - for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { - let msg = - CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); - log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); - parent_endpoint.send(msg)?; - } - Ok(false) - } else { - // I have no parent -> I'm the leader - assert!(self.inner.family.parent_port.is_none()); - let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); - Ok(if let Some(predicate) = maybe_predicate { - let decision = Decision::Success(predicate); - log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); - self.end_round_with_decision(decision)?; - true - } else { - false - }) - } + pub fn new_channel(&mut self) -> [PortId; 2] { + self.connector.add_port_pair() + } +} +impl SyncProtoContext<'_> { + pub fn is_firing(&mut self, port: PortId) -> Option { + todo!() + } + pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + todo!() } +} - fn kick_off_native( - &mut self, - sync_batches: impl Iterator, - ) -> Result { - let MonoN { ports, .. } = self.inner.mono_n.clone(); - let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; - let mut branches = HashMap::<_, _>::default(); - for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { - let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; - let all_ports = ports.iter().copied(); - let all_channel_ids = all_ports.map(port_to_channel_id); +// impl Connector { +// fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncError> { +// log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); +// let ret = match &decision { +// Decision::Success(predicate) => { +// // overwrite MonoN/P +// self.inner.mono_n = { +// let poly_n = self.ephemeral.poly_n.take().unwrap(); +// poly_n.choose_mono(predicate).unwrap_or_else(|| { +// panic!( +// "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", +// &predicate, &poly_n.branches, &self.inner.logger +// ); +// }) +// }; +// self.inner.mono_ps.clear(); +// self.inner.mono_ps.extend( +// self.ephemeral +// .poly_ps +// .drain(..) +// .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), +// ); +// Ok(()) +// } +// Decision::Failure => Err(SyncError::Timeout), +// }; +// let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); +// for &child_port in self.inner.family.children_ports.iter() { +// log!( +// &mut self.inner.logger, +// "Forwarding {:?} to child with port {:?}", +// &announcement, +// child_port +// ); +// self.inner +// .endpoint_exts +// .get_mut(child_port) +// .expect("eefef") +// .endpoint +// .send(announcement.clone())?; +// } +// self.inner.round_index += 1; +// self.ephemeral.clear(); +// ret +// } - let mut predicate = Predicate::new_trivial(); +// // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found +// fn handle_locals_maybe_decide(&mut self) -> Result { +// if let Some(parent_port) = self.inner.family.parent_port { +// // I have a parent -> I'm not the leader +// let parent_endpoint = +// &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; +// for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { +// let msg = +// CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); +// log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); +// parent_endpoint.send(msg)?; +// } +// Ok(false) +// } else { +// // I have no parent -> I'm the leader +// assert!(self.inner.family.parent_port.is_none()); +// let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); +// Ok(if let Some(predicate) = maybe_predicate { +// let decision = Decision::Success(predicate); +// log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); +// self.end_round_with_decision(decision)?; +// true +// } else { +// false +// }) +// } +// } - // assign TRUE for puts and gets - let true_ports = puts.keys().chain(gets.iter()).copied(); - let true_channel_ids = true_ports.clone().map(port_to_channel_id); - predicate.batch_assign_nones(true_channel_ids, true); +// fn kick_off_native( +// &mut self, +// sync_batches: impl Iterator, +// ) -> Result { +// let MonoN { ports, .. } = self.inner.mono_n.clone(); +// let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; +// let mut branches = HashMap::<_, _>::default(); +// for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { +// let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; +// let all_ports = ports.iter().copied(); +// let all_channel_ids = all_ports.map(port_to_channel_id); - // assign FALSE for all in interface not assigned true - predicate.batch_assign_nones(all_channel_ids.clone(), false); +// let mut predicate = Predicate::new_trivial(); - if branches.contains_key(&predicate) { - // TODO what do I do with redundant predicates? - unimplemented!( - "Duplicate predicate {:#?}!\nHaving multiple batches with the same - predicate requires the support of oracle boolean variables", - &predicate, - ) - } - let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; - for (port, payload) in puts { - log!( - &mut self.inner.logger, - "... ... Initial native put msg {:?} pred {:?} batch {:?}", - &payload, - &predicate, - sync_batch_index, - ); - let msg = - CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } - .into_msg(*round_index); - endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; - } - log!( - &mut self.inner.logger, - "... Initial native branch batch index={} with pred {:?}", - sync_batch_index, - &predicate - ); - if branch.to_get.is_empty() { - self.ephemeral.solution_storage.submit_and_digest_subtree_solution( - &mut self.inner.logger, - SubtreeId::PolyN, - predicate.clone(), - ); - } - branches.insert(predicate, branch); - } - Ok(PolyN { ports, branches }) - } - pub fn sync_round( - &mut self, - deadline: Option, - sync_batches: Option>, - ) -> Result<(), SyncErr> { - if let Some(e) = self.unrecoverable_error { - return Err(e.clone()); - } - self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { - SyncErr::Timeout => e, // this isn't unrecoverable - _ => { - // Must set unrecoverable error! and tear down our net channels - self.unrecoverable_error = Some(e); - self.ephemeral.clear(); - self.inner.endpoint_exts = Default::default(); - e - } - }) - } +// // assign TRUE for puts and gets +// let true_ports = puts.keys().chain(gets.iter()).copied(); +// let true_channel_ids = true_ports.clone().map(port_to_channel_id); +// predicate.batch_assign_nones(true_channel_ids, true); - // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. - // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. - fn sync_round_inner( - &mut self, - mut deadline: Option, - sync_batches: Option>, - ) -> Result<(), SyncErr> { - log!( - &mut self.inner.logger, - "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", - self.inner.round_index - ); - assert!(self.ephemeral.is_clear()); - assert!(self.unrecoverable_error.is_none()); +// // assign FALSE for all in interface not assigned true +// predicate.batch_assign_nones(all_channel_ids.clone(), false); - // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). - // Some actors are dropped. some new actors are created. - // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors - self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); - log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); - while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { - let mut m_ctx = MonoPContext { - ports: &mut mono_p.ports, - mono_ps: &mut self.ephemeral.mono_ps, - inner: &mut self.inner, - }; - // cross boundary into crate::protocol - let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); - log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); - match blocker { - MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent), - MonoBlocker::ComponentExit => drop(mono_p), - MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), - } - } - log!( - &mut self.inner.logger, - "Finished running all MonoPs! Have {} PolyPs waiting", - self.ephemeral.poly_ps.len() - ); +// if branches.contains_key(&predicate) { +// // TODO what do I do with redundant predicates? +// unimplemented!( +// "Duplicate predicate {:#?}!\nHaving multiple batches with the same +// predicate requires the support of oracle boolean variables", +// &predicate, +// ) +// } +// let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; +// for (port, payload) in puts { +// log!( +// &mut self.inner.logger, +// "... ... Initial native put msg {:?} pred {:?} batch {:?}", +// &payload, +// &predicate, +// sync_batch_index, +// ); +// let msg = +// CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } +// .into_msg(*round_index); +// endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; +// } +// log!( +// &mut self.inner.logger, +// "... Initial native branch batch index={} with pred {:?}", +// sync_batch_index, +// &predicate +// ); +// if branch.to_get.is_empty() { +// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( +// &mut self.inner.logger, +// SubtreeId::PolyN, +// predicate.clone(), +// ); +// } +// branches.insert(predicate, branch); +// } +// Ok(PolyN { ports, branches }) +// } +// pub fn sync_round( +// &mut self, +// deadline: Option, +// sync_batches: Option>, +// ) -> Result<(), SyncError> { +// if let Some(e) = self.unrecoverable_error { +// return Err(e.clone()); +// } +// self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { +// SyncError::Timeout => e, // this isn't unrecoverable +// _ => { +// // Must set unrecoverable error! and tear down our net channels +// self.unrecoverable_error = Some(e); +// self.ephemeral.clear(); +// self.inner.endpoint_exts = Default::default(); +// e +// } +// }) +// } - // 3. define the mapping from port -> actor - // this is needed during the event loop to determine which actor - // should receive the incoming message. - // TODO: store and update this mapping rather than rebuilding it each round. - let port_to_holder: HashMap = { - use PolyId::*; - let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); - let p = self - .ephemeral - .poly_ps - .iter() - .enumerate() - .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); - n.chain(p).collect() - }; - log!( - &mut self.inner.logger, - "SET OF PolyPs and MonoPs final! port lookup map is {:?}", - &port_to_holder - ); +// // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. +// // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. +// fn sync_round_inner( +// &mut self, +// mut deadline: Option, +// sync_batches: Option>, +// ) -> Result<(), SyncError> { +// log!( +// &mut self.inner.logger, +// "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", +// self.inner.round_index +// ); +// assert!(self.ephemeral.is_clear()); +// assert!(self.unrecoverable_error.is_none()); - // 4. Create the solution storage. it tracks the solutions of "subtrees" - // of the controller in the overlay tree. - self.ephemeral.solution_storage.reset({ - let n = std::iter::once(SubtreeId::PolyN); - let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); - let c = self - .inner - .family - .children_ports - .iter() - .map(|&port| SubtreeId::ChildController { port }); - let subtree_id_iter = n.chain(m).chain(c); - log!( - &mut self.inner.logger, - "Solution Storage has subtree Ids: {:?}", - &subtree_id_iter.clone().collect::>() - ); - subtree_id_iter - }); +// // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). +// // Some actors are dropped. some new actors are created. +// // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors +// self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); +// log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); +// while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { +// let mut m_ctx = ProtoSyncContext { +// ports: &mut mono_p.ports, +// mono_ps: &mut self.ephemeral.mono_ps, +// inner: &mut self.inner, +// }; +// // cross boundary into crate::protocol +// let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); +// log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); +// match blocker { +// NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent), +// NonsyncBlocker::ComponentExit => drop(mono_p), +// NonsyncBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), +// } +// } +// log!( +// &mut self.inner.logger, +// "Finished running all MonoPs! Have {} PolyPs waiting", +// self.ephemeral.poly_ps.len() +// ); - // 5. kick off the synchronous round of the native actor if it exists +// // 3. define the mapping from port -> actor +// // this is needed during the event loop to determine which actor +// // should receive the incoming message. +// // TODO: store and update this mapping rather than rebuilding it each round. +// let port_to_holder: HashMap = { +// use PolyId::*; +// let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); +// let p = self +// .ephemeral +// .poly_ps +// .iter() +// .enumerate() +// .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); +// n.chain(p).collect() +// }; +// log!( +// &mut self.inner.logger, +// "SET OF PolyPs and MonoPs final! port lookup map is {:?}", +// &port_to_holder +// ); - log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); - self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { - // using if let because of nested ? operator - // TODO check that there are 1+ branches or NO SOLUTION - let poly_n = self.kick_off_native(sync_batches)?; - log!( - &mut self.inner.logger, - "PolyN kicked off, and has branches with predicates... {:?}", - poly_n.branches.keys().collect::>() - ); - Some(poly_n) - } else { - log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); - None - }; +// // 4. Create the solution storage. it tracks the solutions of "subtrees" +// // of the controller in the overlay tree. +// self.ephemeral.solution_storage.reset({ +// let n = std::iter::once(SubtreeId::PolyN); +// let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); +// let c = self +// .inner +// .family +// .children_ports +// .iter() +// .map(|&port| SubtreeId::ChildController { port }); +// let subtree_id_iter = n.chain(m).chain(c); +// log!( +// &mut self.inner.logger, +// "Solution Storage has subtree Ids: {:?}", +// &subtree_id_iter.clone().collect::>() +// ); +// subtree_id_iter +// }); - // 6. Kick off the synchronous round of each protocol actor - // If just one actor becomes inconsistent now, there can be no solution! - // TODO distinguish between completed and not completed poly_p's? - log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); - for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { - let my_subtree_id = SubtreeId::PolyP { index }; - let m_ctx = PolyPContext { - my_subtree_id, - inner: &mut self.inner, - solution_storage: &mut self.ephemeral.solution_storage, - }; - use SyncRunResult as Srr; - let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; - log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); - match blocker { - Srr::NoBranches => return Err(SyncErr::Inconsistent), - Srr::AllBranchesComplete | Srr::BlockingForRecv => (), - } - } - log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); +// // 5. kick off the synchronous round of the native actor if it exists - // 7. `solution_storage` may have new solutions for this controller - // handle their discovery. LEADER => announce, otherwise => send to parent - { - let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); - log!( - &mut self.inner.logger, - "Got {} controller-local solutions before a single RECV: {:?}", - peeked.len(), - peeked - ); - } - if self.handle_locals_maybe_decide()? { - return Ok(()); - } +// log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); +// self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { +// // using if let because of nested ? operator +// // TODO check that there are 1+ branches or NO SOLUTION +// let poly_n = self.kick_off_native(sync_batches)?; +// log!( +// &mut self.inner.logger, +// "PolyN kicked off, and has branches with predicates... {:?}", +// poly_n.branches.keys().collect::>() +// ); +// Some(poly_n) +// } else { +// log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); +// None +// }; - // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error - log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); - self.undelay_all(); - 'recv_loop: loop { - log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); - let received = match deadline { - None => { - // we have personally timed out. perform a "long" poll. - self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") - } - Some(d) => match self.recv(d)? { - // we have not yet timed out. performed a time-limited poll - Some(received) => received, - None => { - // timed out! send a FAILURE message to the sink, - // and henceforth don't time out on polling. - deadline = None; - match self.inner.family.parent_port { - None => { - // I am the sink! announce failure and return. - return self.end_round_with_decision(Decision::Failure); - } - Some(parent_port) => { - // I am not the sink! send a failure message. - let announcement = Msg::CommMsg(CommMsg { - round_index: self.inner.round_index, - contents: CommMsgContents::Failure, - }); - log!( - &mut self.inner.logger, - "Forwarding {:?} to parent with port {:?}", - &announcement, - parent_port - ); - self.inner - .endpoint_exts - .get_mut(parent_port) - .expect("ss") - .endpoint - .send(announcement.clone())?; - continue; // poll some more - } - } - } - }, - }; - log!(&mut self.inner.logger, "::: message {:?}...", &received); - let current_content = match received.msg { - Msg::SetupMsg(s) => { - // This occurs in the event the connector was malformed during connect() - println!("WASNT EXPECTING {:?}", s); - return Err(SyncErr::UnexpectedSetupMsg); - } - Msg::CommMsg(CommMsg { round_index, .. }) - if round_index < self.inner.round_index => - { - // Old message! Can safely discard - log!(&mut self.inner.logger, "...and its OLD! :("); - drop(received); - continue 'recv_loop; - } - Msg::CommMsg(CommMsg { round_index, .. }) - if round_index > self.inner.round_index => - { - // Message from a next round. Keep for later! - log!(&mut self.inner.logger, "... DELAY! :("); - self.delay(received); - continue 'recv_loop; - } - Msg::CommMsg(CommMsg { contents, round_index }) => { - log!( - &mut self.inner.logger, - "... its a round-appropriate CommMsg with port {:?}", - received.recipient - ); - assert_eq!(round_index, self.inner.round_index); - contents - } - }; - match current_content { - CommMsgContents::Failure => match self.inner.family.parent_port { - Some(parent_port) => { - let announcement = Msg::CommMsg(CommMsg { - round_index: self.inner.round_index, - contents: CommMsgContents::Failure, - }); - log!( - &mut self.inner.logger, - "Forwarding {:?} to parent with port {:?}", - &announcement, - parent_port - ); - self.inner - .endpoint_exts - .get_mut(parent_port) - .expect("ss") - .endpoint - .send(announcement.clone())?; - } - None => return self.end_round_with_decision(Decision::Failure), - }, - CommMsgContents::Elaborate { partial_oracle } => { - // Child controller submitted a subtree solution. - if !self.inner.family.children_ports.contains(&received.recipient) { - return Err(SyncErr::ElaborateFromNonChild); - } - let subtree_id = SubtreeId::ChildController { port: received.recipient }; - log!( - &mut self.inner.logger, - "Received elaboration from child for subtree {:?}: {:?}", - subtree_id, - &partial_oracle - ); - self.ephemeral.solution_storage.submit_and_digest_subtree_solution( - &mut self.inner.logger, - subtree_id, - partial_oracle, - ); - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - } - CommMsgContents::Announce { decision } => { - if self.inner.family.parent_port != Some(received.recipient) { - return Err(SyncErr::AnnounceFromNonParent); - } - log!( - &mut self.inner.logger, - "Received ANNOUNCEMENT from from parent {:?}: {:?}", - received.recipient, - &decision - ); - return self.end_round_with_decision(decision); - } - CommMsgContents::SendPayload { payload_predicate, payload } => { - // check that we expect to be able to receive payloads from this sender - assert_eq!( - Getter, - self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity - ); +// // 6. Kick off the synchronous round of each protocol actor +// // If just one actor becomes inconsistent now, there can be no solution! +// // TODO distinguish between completed and not completed poly_p's? +// log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); +// for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { +// let my_subtree_id = SubtreeId::PolyP { index }; +// let m_ctx = PolyPContext { +// my_subtree_id, +// inner: &mut self.inner, +// solution_storage: &mut self.ephemeral.solution_storage, +// }; +// use SyncRunResult as Srr; +// let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; +// log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); +// match blocker { +// Srr::NoBranches => return Err(SyncError::Inconsistent), +// Srr::AllBranchesComplete | Srr::BlockingForRecv => (), +// } +// } +// log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); - // message for some actor. Feed it to the appropriate actor - // and then give them another chance to run. - let subtree_id = port_to_holder.get(&received.recipient); - log!( - &mut self.inner.logger, - "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", - subtree_id, - &payload_predicate, - &payload - ); - let channel_id = self - .inner - .endpoint_exts - .get(received.recipient) - .expect("UEHFU") - .info - .channel_id; - if payload_predicate.query(channel_id) != Some(true) { - // sender didn't preserve the invariant - return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id)); - } - match subtree_id { - None => { - // this happens when a message is sent to a component that has exited. - // It's safe to drop this message; - // The sender branch will certainly not be part of the solution - } - Some(PolyId::N) => { - // Message for NativeMachine - self.ephemeral.poly_n.as_mut().unwrap().sync_recv( - received.recipient, - &mut self.inner.logger, - payload, - payload_predicate, - &mut self.ephemeral.solution_storage, - ); - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - } - Some(PolyId::P { index }) => { - // Message for protocol actor - let poly_p = &mut self.ephemeral.poly_ps[*index]; +// // 7. `solution_storage` may have new solutions for this controller +// // handle their discovery. LEADER => announce, otherwise => send to parent +// { +// let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); +// log!( +// &mut self.inner.logger, +// "Got {} controller-local solutions before a single RECV: {:?}", +// peeked.len(), +// peeked +// ); +// } +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } - let m_ctx = PolyPContext { - my_subtree_id: SubtreeId::PolyP { index: *index }, - inner: &mut self.inner, - solution_storage: &mut self.ephemeral.solution_storage, - }; - use SyncRunResult as Srr; - let blocker = poly_p.poly_recv_run( - m_ctx, - &self.protocol_description, - received.recipient, - payload_predicate, - payload, - )?; - log!( - &mut self.inner.logger, - "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", - subtree_id, - blocker - ); - match blocker { - Srr::NoBranches => return Err(SyncErr::Inconsistent), - Srr::BlockingForRecv | Srr::AllBranchesComplete => { - { - let peeked = self - .ephemeral - .solution_storage - .peek_new_locals() - .collect::>(); - log!( - &mut self.inner.logger, - "Got {} new controller-local solutions from RECV: {:?}", - peeked.len(), - peeked - ); - } - if self.handle_locals_maybe_decide()? { - return Ok(()); - } - } - } - } - }; - } - } - } - } -} -impl ControllerEphemeral { - fn is_clear(&self) -> bool { - self.solution_storage.is_clear() - && self.poly_n.is_none() - && self.poly_ps.is_empty() - && self.mono_ps.is_empty() - && self.port_to_holder.is_empty() - } - fn clear(&mut self) { - self.solution_storage.clear(); - self.poly_n.take(); - self.poly_ps.clear(); - self.port_to_holder.clear(); - } -} -impl Into for MonoP { - fn into(self) -> PolyP { - PolyP { - complete: Default::default(), - incomplete: hashmap! { - Predicate::new_trivial() => - BranchP { - state: self.state, - inbox: Default::default(), - outbox: Default::default(), - blocking_on: None, - } - }, - ports: self.ports, - } - } -} +// // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error +// log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); +// self.undelay_all(); +// 'recv_loop: loop { +// log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); +// let received = match deadline { +// None => { +// // we have personally timed out. perform a "long" poll. +// self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") +// } +// Some(d) => match self.recv(d)? { +// // we have not yet timed out. performed a time-limited poll +// Some(received) => received, +// None => { +// // timed out! send a FAILURE message to the sink, +// // and henceforth don't time out on polling. +// deadline = None; +// match self.inner.family.parent_port { +// None => { +// // I am the sink! announce failure and return. +// return self.end_round_with_decision(Decision::Failure); +// } +// Some(parent_port) => { +// // I am not the sink! send a failure message. +// let announcement = Msg::CommMsg(CommMsg { +// round_index: self.inner.round_index, +// contents: CommMsgContents::Failure, +// }); +// log!( +// &mut self.inner.logger, +// "Forwarding {:?} to parent with port {:?}", +// &announcement, +// parent_port +// ); +// self.inner +// .endpoint_exts +// .get_mut(parent_port) +// .expect("ss") +// .endpoint +// .send(announcement.clone())?; +// continue; // poll some more +// } +// } +// } +// }, +// }; +// log!(&mut self.inner.logger, "::: message {:?}...", &received); +// let current_content = match received.msg { +// Msg::SetupMsg(s) => { +// // This occurs in the event the connector was malformed during connect() +// println!("WASNT EXPECTING {:?}", s); +// return Err(SyncError::UnexpectedSetupMsg); +// } +// Msg::CommMsg(CommMsg { round_index, .. }) +// if round_index < self.inner.round_index => +// { +// // Old message! Can safely discard +// log!(&mut self.inner.logger, "...and its OLD! :("); +// drop(received); +// continue 'recv_loop; +// } +// Msg::CommMsg(CommMsg { round_index, .. }) +// if round_index > self.inner.round_index => +// { +// // Message from a next round. Keep for later! +// log!(&mut self.inner.logger, "... DELAY! :("); +// self.delay(received); +// continue 'recv_loop; +// } +// Msg::CommMsg(CommMsg { contents, round_index }) => { +// log!( +// &mut self.inner.logger, +// "... its a round-appropriate CommMsg with port {:?}", +// received.recipient +// ); +// assert_eq!(round_index, self.inner.round_index); +// contents +// } +// }; +// match current_content { +// CommMsgContents::Failure => match self.inner.family.parent_port { +// Some(parent_port) => { +// let announcement = Msg::CommMsg(CommMsg { +// round_index: self.inner.round_index, +// contents: CommMsgContents::Failure, +// }); +// log!( +// &mut self.inner.logger, +// "Forwarding {:?} to parent with port {:?}", +// &announcement, +// parent_port +// ); +// self.inner +// .endpoint_exts +// .get_mut(parent_port) +// .expect("ss") +// .endpoint +// .send(announcement.clone())?; +// } +// None => return self.end_round_with_decision(Decision::Failure), +// }, +// CommMsgContents::Elaborate { partial_oracle } => { +// // Child controller submitted a subtree solution. +// if !self.inner.family.children_ports.contains(&received.recipient) { +// return Err(SyncError::ElaborateFromNonChild); +// } +// let subtree_id = SubtreeId::ChildController { port: received.recipient }; +// log!( +// &mut self.inner.logger, +// "Received elaboration from child for subtree {:?}: {:?}", +// subtree_id, +// &partial_oracle +// ); +// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( +// &mut self.inner.logger, +// subtree_id, +// partial_oracle, +// ); +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } +// } +// CommMsgContents::Announce { decision } => { +// if self.inner.family.parent_port != Some(received.recipient) { +// return Err(SyncError::AnnounceFromNonParent); +// } +// log!( +// &mut self.inner.logger, +// "Received ANNOUNCEMENT from from parent {:?}: {:?}", +// received.recipient, +// &decision +// ); +// return self.end_round_with_decision(decision); +// } +// CommMsgContents::SendPayload { payload_predicate, payload } => { +// // check that we expect to be able to receive payloads from this sender +// assert_eq!( +// Getter, +// self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity +// ); -impl From for SyncErr { - fn from(e: EndpointErr) -> SyncErr { - SyncErr::EndpointErr(e) - } -} +// // message for some actor. Feed it to the appropriate actor +// // and then give them another chance to run. +// let subtree_id = port_to_holder.get(&received.recipient); +// log!( +// &mut self.inner.logger, +// "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", +// subtree_id, +// &payload_predicate, +// &payload +// ); +// let channel_id = self +// .inner +// .endpoint_exts +// .get(received.recipient) +// .expect("UEHFU") +// .info +// .channel_id; +// if payload_predicate.query(channel_id) != Some(true) { +// // sender didn't preserve the invariant +// return Err(SyncError::PayloadPremiseExcludesTheChannel(channel_id)); +// } +// match subtree_id { +// None => { +// // this happens when a message is sent to a component that has exited. +// // It's safe to drop this message; +// // The sender branch will certainly not be part of the solution +// } +// Some(PolyId::N) => { +// // Message for NativeMachine +// self.ephemeral.poly_n.as_mut().unwrap().sync_recv( +// received.recipient, +// &mut self.inner.logger, +// payload, +// payload_predicate, +// &mut self.ephemeral.solution_storage, +// ); +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } +// } +// Some(PolyId::P { index }) => { +// // Message for protocol actor +// let poly_p = &mut self.ephemeral.poly_ps[*index]; -impl MonoContext for MonoPContext<'_> { - type D = ProtocolD; - type S = ProtocolS; - fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { - log!( - &mut self.inner.logger, - "!! MonoContext callback to new_component with ports {:?}!", - &moved_ports, - ); - if moved_ports.is_subset(self.ports) { - self.ports.retain(|x| !moved_ports.contains(x)); - self.mono_ps.push(MonoP { state: init_state, ports: moved_ports }); - } else { - panic!("MachineP attempting to move alien port!"); - } - } - fn new_channel(&mut self) -> [PortId; 2] { - let [a, b] = Endpoint::new_memory_pair(); - let channel_id = self.inner.channel_id_stream.next(); +// let m_ctx = PolyPContext { +// my_subtree_id: SubtreeId::PolyP { index: *index }, +// inner: &mut self.inner, +// solution_storage: &mut self.ephemeral.solution_storage, +// }; +// use SyncRunResult as Srr; +// let blocker = poly_p.poly_recv_run( +// m_ctx, +// &self.protocol_description, +// received.recipient, +// payload_predicate, +// payload, +// )?; +// log!( +// &mut self.inner.logger, +// "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", +// subtree_id, +// blocker +// ); +// match blocker { +// Srr::NoBranches => return Err(SyncError::Inconsistent), +// Srr::BlockingForRecv | Srr::AllBranchesComplete => { +// { +// let peeked = self +// .ephemeral +// .solution_storage +// .peek_new_locals() +// .collect::>(); +// log!( +// &mut self.inner.logger, +// "Got {} new controller-local solutions from RECV: {:?}", +// peeked.len(), +// peeked +// ); +// } +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } +// } +// } +// } +// }; +// } +// } +// } +// } +// } +// impl ControllerEphemeral { +// fn is_clear(&self) -> bool { +// self.solution_storage.is_clear() +// && self.poly_n.is_none() +// && self.poly_ps.is_empty() +// && self.mono_ps.is_empty() +// && self.port_to_holder.is_empty() +// } +// fn clear(&mut self) { +// self.solution_storage.clear(); +// self.poly_n.take(); +// self.poly_ps.clear(); +// self.port_to_holder.clear(); +// } +// } +// impl Into for MonoP { +// fn into(self) -> PolyP { +// PolyP { +// complete: Default::default(), +// incomplete: hashmap! { +// Predicate::new_trivial() => +// BranchP { +// state: self.state, +// inbox: Default::default(), +// outbox: Default::default(), +// blocking_on: None, +// } +// }, +// ports: self.ports, +// } +// } +// } - let mut clos = |endpoint, polarity| { - let endpoint_ext = - EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint }; - let port = self.inner.endpoint_exts.alloc(endpoint_ext); - let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint; - let token = PortId::to_token(port); - self.inner - .messenger_state - .poll - .register(endpoint, token, Ready::readable(), PollOpt::edge()) - .expect("AAGAGGGGG"); - self.ports.insert(port); - port - }; - let [kp, kg] = [clos(a, Putter), clos(b, Getter)]; - log!( - &mut self.inner.logger, - "!! MonoContext callback to new_channel. returning ports {:?}!", - [kp, kg], - ); - [kp, kg] - } - fn new_random(&mut self) -> u64 { - type Bytes8 = [u8; std::mem::size_of::()]; - let mut bytes = Bytes8::default(); - getrandom::getrandom(&mut bytes).unwrap(); - let val = unsafe { std::mem::transmute::(bytes) }; - log!( - &mut self.inner.logger, - "!! MonoContext callback to new_random. returning val {:?}!", - val, - ); - val - } -} +// impl From for SyncError { +// fn from(e: EndpointError) -> SyncError { +// SyncError::EndpointError(e) +// } +// } -impl SolutionStorage { - fn is_clear(&self) -> bool { - self.subtree_id_to_index.is_empty() - && self.subtree_solutions.is_empty() - && self.old_local.is_empty() - && self.new_local.is_empty() - } - fn clear(&mut self) { - self.subtree_id_to_index.clear(); - self.subtree_solutions.clear(); - self.old_local.clear(); - self.new_local.clear(); - } - pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { - self.subtree_id_to_index.clear(); - self.subtree_solutions.clear(); - self.old_local.clear(); - self.new_local.clear(); - for key in subtree_ids { - self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); - self.subtree_solutions.push(Default::default()) - } - } +// impl ProtoSyncContext<'_> { +// fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { +// todo!() +// } +// fn new_channel(&mut self) -> [PortId; 2] { +// todo!() +// } +// } - pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { - self.new_local.iter() - } +// impl SolutionStorage { +// fn is_clear(&self) -> bool { +// self.subtree_id_to_index.is_empty() +// && self.subtree_solutions.is_empty() +// && self.old_local.is_empty() +// && self.new_local.is_empty() +// } +// fn clear(&mut self) { +// self.subtree_id_to_index.clear(); +// self.subtree_solutions.clear(); +// self.old_local.clear(); +// self.new_local.clear(); +// } +// pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { +// self.subtree_id_to_index.clear(); +// self.subtree_solutions.clear(); +// self.old_local.clear(); +// self.new_local.clear(); +// for key in subtree_ids { +// self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); +// self.subtree_solutions.push(Default::default()) +// } +// } - pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { - let Self { old_local, new_local, .. } = self; - new_local.drain().map(move |local| { - old_local.insert(local.clone()); - local - }) - } +// pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { +// self.new_local.iter() +// } - pub(crate) fn submit_and_digest_subtree_solution( - &mut self, - logger: &mut String, - subtree_id: SubtreeId, - predicate: Predicate, - ) { - log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); - let index = self.subtree_id_to_index[&subtree_id]; - let left = 0..index; - let right = (index + 1)..self.subtree_solutions.len(); +// pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { +// let Self { old_local, new_local, .. } = self; +// new_local.drain().map(move |local| { +// old_local.insert(local.clone()); +// local +// }) +// } - let Self { subtree_solutions, new_local, old_local, .. } = self; - let was_new = subtree_solutions[index].insert(predicate.clone()); - if was_new { - let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); - Self::elaborate_into_new_local_rec( - logger, - predicate, - set_visitor, - old_local, - new_local, - ); - } - } +// pub(crate) fn submit_and_digest_subtree_solution( +// &mut self, +// logger: &mut String, +// subtree_id: SubtreeId, +// predicate: Predicate, +// ) { +// log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); +// let index = self.subtree_id_to_index[&subtree_id]; +// let left = 0..index; +// let right = (index + 1)..self.subtree_solutions.len(); - fn elaborate_into_new_local_rec<'a, 'b>( - logger: &mut String, - partial: Predicate, - mut set_visitor: impl Iterator> + Clone, - old_local: &'b HashSet, - new_local: &'a mut HashSet, - ) { - if let Some(set) = set_visitor.next() { - // incomplete solution. keep traversing - for pred in set.iter() { - if let Some(elaborated) = pred.union_with(&partial) { - Self::elaborate_into_new_local_rec( - logger, - elaborated, - set_visitor.clone(), - old_local, - new_local, - ) - } - } - } else { - // recursive stop condition. `partial` is a local subtree solution - if !old_local.contains(&partial) { - // ... and it hasn't been found before - log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); - new_local.insert(partial); - } - } - } -} -impl PolyContext for BranchPContext<'_, '_> { - type D = ProtocolD; +// let Self { subtree_solutions, new_local, old_local, .. } = self; +// let was_new = subtree_solutions[index].insert(predicate.clone()); +// if was_new { +// let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); +// Self::elaborate_into_new_local_rec( +// logger, +// predicate, +// set_visitor, +// old_local, +// new_local, +// ); +// } +// } - fn is_firing(&mut self, port: PortId) -> Option { - assert!(self.ports.contains(&port)); - let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; - let val = self.predicate.query(channel_id); - log!( - &mut self.m_ctx.inner.logger, - "!! PolyContext callback to is_firing by {:?}! returning {:?}", - self.m_ctx.my_subtree_id, - val, - ); - val - } - fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - assert!(self.ports.contains(&port)); - let val = self.inbox.get(&port); - log!( - &mut self.m_ctx.inner.logger, - "!! PolyContext callback to read_msg by {:?}! returning {:?}", - self.m_ctx.my_subtree_id, - val, - ); - val - } -} +// fn elaborate_into_new_local_rec<'a, 'b>( +// logger: &mut String, +// partial: Predicate, +// mut set_visitor: impl Iterator> + Clone, +// old_local: &'b HashSet, +// new_local: &'a mut HashSet, +// ) { +// if let Some(set) = set_visitor.next() { +// // incomplete solution. keep traversing +// for pred in set.iter() { +// if let Some(elaborated) = pred.union_with(&partial) { +// Self::elaborate_into_new_local_rec( +// logger, +// elaborated, +// set_visitor.clone(), +// old_local, +// new_local, +// ) +// } +// } +// } else { +// // recursive stop condition. `partial` is a local subtree solution +// if !old_local.contains(&partial) { +// // ... and it hasn't been found before +// log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); +// new_local.insert(partial); +// } +// } +// } +// } +// impl PolyContext for BranchPContext<'_, '_> { +// type D = ProtocolD; + +// fn is_firing(&mut self, port: PortId) -> Option { +// assert!(self.ports.contains(&port)); +// let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; +// let val = self.predicate.query(channel_id); +// log!( +// &mut self.m_ctx.inner.logger, +// "!! PolyContext callback to is_firing by {:?}! returning {:?}", +// self.m_ctx.my_subtree_id, +// val, +// ); +// val +// } +// fn read_msg(&mut self, port: PortId) -> Option<&Payload> { +// assert!(self.ports.contains(&port)); +// let val = self.inbox.get(&port); +// log!( +// &mut self.m_ctx.inner.logger, +// "!! PolyContext callback to read_msg by {:?}! returning {:?}", +// self.m_ctx.my_subtree_id, +// val, +// ); +// val +// } +// }