diff --git a/src/runtime/retired/communication.rs b/src/runtime/retired/communication.rs new file mode 100644 index 0000000000000000000000000000000000000000..e619b0712bbc97002d4b7cb762fd5e264fafdb49 --- /dev/null +++ b/src/runtime/retired/communication.rs @@ -0,0 +1,739 @@ +use crate::common::*; +use crate::runtime::{actors::*, endpoint::*, errors::*, *}; + +impl Controller { + fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncErr> { + log!(&mut self.inner.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); + let ret = match &decision { + Decision::Success(predicate) => { + // overwrite MonoN/P + self.inner.mono_n = { + let poly_n = self.ephemeral.poly_n.take().unwrap(); + poly_n.choose_mono(predicate).unwrap_or_else(|| { + panic!( + "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", + &predicate, &poly_n.branches, &self.inner.logger + ); + }) + }; + self.inner.mono_ps.clear(); + self.inner.mono_ps.extend( + self.ephemeral + .poly_ps + .drain(..) + .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), + ); + Ok(()) + } + Decision::Failure => Err(SyncErr::Timeout), + }; + let announcement = CommMsgContents::Announce { decision }.into_msg(self.inner.round_index); + for &child_port in self.inner.family.children_ports.iter() { + log!( + &mut self.inner.logger, + "Forwarding {:?} to child with port {:?}", + &announcement, + child_port + ); + self.inner + .endpoint_exts + .get_mut(child_port) + .expect("eefef") + .endpoint + .send(announcement.clone())?; + } + self.inner.round_index += 1; + self.ephemeral.clear(); + ret + } + + // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found + fn handle_locals_maybe_decide(&mut self) -> Result { + if let Some(parent_port) = self.inner.family.parent_port { + // I have a parent -> I'm not the leader + let parent_endpoint = + &mut self.inner.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; + for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { + let msg = + CommMsgContents::Elaborate { partial_oracle }.into_msg(self.inner.round_index); + log!(&mut self.inner.logger, "Sending {:?} to parent {:?}", &msg, parent_port); + parent_endpoint.send(msg)?; + } + Ok(false) + } else { + // I have no parent -> I'm the leader + assert!(self.inner.family.parent_port.is_none()); + let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); + Ok(if let Some(predicate) = maybe_predicate { + let decision = Decision::Success(predicate); + log!(&mut self.inner.logger, "DECIDE ON {:?} AS LEADER!", &decision); + self.end_round_with_decision(decision)?; + true + } else { + false + }) + } + } + + fn kick_off_native( + &mut self, + sync_batches: impl Iterator, + ) -> Result { + let MonoN { ports, .. } = self.inner.mono_n.clone(); + let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; + let mut branches = HashMap::<_, _>::default(); + for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { + let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; + let all_ports = ports.iter().copied(); + let all_channel_ids = all_ports.map(port_to_channel_id); + + let mut predicate = Predicate::new_trivial(); + + // assign TRUE for puts and gets + let true_ports = puts.keys().chain(gets.iter()).copied(); + let true_channel_ids = true_ports.clone().map(port_to_channel_id); + predicate.batch_assign_nones(true_channel_ids, true); + + // assign FALSE for all in interface not assigned true + predicate.batch_assign_nones(all_channel_ids.clone(), false); + + if branches.contains_key(&predicate) { + // TODO what do I do with redundant predicates? + unimplemented!( + "Duplicate predicate {:#?}!\nHaving multiple batches with the same + predicate requires the support of oracle boolean variables", + &predicate, + ) + } + let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; + for (port, payload) in puts { + log!( + &mut self.inner.logger, + "... ... Initial native put msg {:?} pred {:?} batch {:?}", + &payload, + &predicate, + sync_batch_index, + ); + let msg = + CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } + .into_msg(*round_index); + endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; + } + log!( + &mut self.inner.logger, + "... Initial native branch batch index={} with pred {:?}", + sync_batch_index, + &predicate + ); + if branch.to_get.is_empty() { + self.ephemeral.solution_storage.submit_and_digest_subtree_solution( + &mut self.inner.logger, + SubtreeId::PolyN, + predicate.clone(), + ); + } + branches.insert(predicate, branch); + } + Ok(PolyN { ports, branches }) + } + pub fn sync_round( + &mut self, + deadline: Option, + sync_batches: Option>, + ) -> Result<(), SyncErr> { + if let Some(e) = self.unrecoverable_error { + return Err(e.clone()); + } + self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { + SyncErr::Timeout => e, // this isn't unrecoverable + _ => { + // Must set unrecoverable error! and tear down our net channels + self.unrecoverable_error = Some(e); + self.ephemeral.clear(); + self.inner.endpoint_exts = Default::default(); + e + } + }) + } + + // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. + // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. + fn sync_round_inner( + &mut self, + mut deadline: Option, + sync_batches: Option>, + ) -> Result<(), SyncErr> { + log!( + &mut self.inner.logger, + "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", + self.inner.round_index + ); + assert!(self.ephemeral.is_clear()); + assert!(self.unrecoverable_error.is_none()); + + // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). + // Some actors are dropped. some new actors are created. + // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors + self.ephemeral.mono_ps.extend(self.inner.mono_ps.iter().cloned()); + log!(&mut self.inner.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); + while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { + let mut m_ctx = MonoPContext { + ports: &mut mono_p.ports, + mono_ps: &mut self.ephemeral.mono_ps, + inner: &mut self.inner, + }; + // cross boundary into crate::protocol + let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); + log!(&mut self.inner.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); + match blocker { + MonoBlocker::Inconsistent => return Err(SyncErr::Inconsistent), + MonoBlocker::ComponentExit => drop(mono_p), + MonoBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), + } + } + log!( + &mut self.inner.logger, + "Finished running all MonoPs! Have {} PolyPs waiting", + self.ephemeral.poly_ps.len() + ); + + // 3. define the mapping from port -> actor + // this is needed during the event loop to determine which actor + // should receive the incoming message. + // TODO: store and update this mapping rather than rebuilding it each round. + let port_to_holder: HashMap = { + use PolyId::*; + let n = self.inner.mono_n.ports.iter().map(move |&e| (e, N)); + let p = self + .ephemeral + .poly_ps + .iter() + .enumerate() + .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); + n.chain(p).collect() + }; + log!( + &mut self.inner.logger, + "SET OF PolyPs and MonoPs final! port lookup map is {:?}", + &port_to_holder + ); + + // 4. Create the solution storage. it tracks the solutions of "subtrees" + // of the controller in the overlay tree. + self.ephemeral.solution_storage.reset({ + let n = std::iter::once(SubtreeId::PolyN); + let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); + let c = self + .inner + .family + .children_ports + .iter() + .map(|&port| SubtreeId::ChildController { port }); + let subtree_id_iter = n.chain(m).chain(c); + log!( + &mut self.inner.logger, + "Solution Storage has subtree Ids: {:?}", + &subtree_id_iter.clone().collect::>() + ); + subtree_id_iter + }); + + // 5. kick off the synchronous round of the native actor if it exists + + log!(&mut self.inner.logger, "Kicking off native's synchronous round..."); + self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { + // using if let because of nested ? operator + // TODO check that there are 1+ branches or NO SOLUTION + let poly_n = self.kick_off_native(sync_batches)?; + log!( + &mut self.inner.logger, + "PolyN kicked off, and has branches with predicates... {:?}", + poly_n.branches.keys().collect::>() + ); + Some(poly_n) + } else { + log!(&mut self.inner.logger, "NO NATIVE COMPONENT"); + None + }; + + // 6. Kick off the synchronous round of each protocol actor + // If just one actor becomes inconsistent now, there can be no solution! + // TODO distinguish between completed and not completed poly_p's? + log!(&mut self.inner.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); + for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { + let my_subtree_id = SubtreeId::PolyP { index }; + let m_ctx = PolyPContext { + my_subtree_id, + inner: &mut self.inner, + solution_storage: &mut self.ephemeral.solution_storage, + }; + use SyncRunResult as Srr; + let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; + log!(&mut self.inner.logger, "... PolyP's poly_run got blocker {:?}", &blocker); + match blocker { + Srr::NoBranches => return Err(SyncErr::Inconsistent), + Srr::AllBranchesComplete | Srr::BlockingForRecv => (), + } + } + log!(&mut self.inner.logger, "All Poly machines have been kicked off!"); + + // 7. `solution_storage` may have new solutions for this controller + // handle their discovery. LEADER => announce, otherwise => send to parent + { + let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); + log!( + &mut self.inner.logger, + "Got {} controller-local solutions before a single RECV: {:?}", + peeked.len(), + peeked + ); + } + if self.handle_locals_maybe_decide()? { + return Ok(()); + } + + // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error + log!(&mut self.inner.logger, "`No decision yet`. Time to recv messages"); + self.undelay_all(); + 'recv_loop: loop { + log!(&mut self.inner.logger, "`POLLING` with deadline {:?}...", deadline); + let received = match deadline { + None => { + // we have personally timed out. perform a "long" poll. + self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") + } + Some(d) => match self.recv(d)? { + // we have not yet timed out. performed a time-limited poll + Some(received) => received, + None => { + // timed out! send a FAILURE message to the sink, + // and henceforth don't time out on polling. + deadline = None; + match self.inner.family.parent_port { + None => { + // I am the sink! announce failure and return. + return self.end_round_with_decision(Decision::Failure); + } + Some(parent_port) => { + // I am not the sink! send a failure message. + let announcement = Msg::CommMsg(CommMsg { + round_index: self.inner.round_index, + contents: CommMsgContents::Failure, + }); + log!( + &mut self.inner.logger, + "Forwarding {:?} to parent with port {:?}", + &announcement, + parent_port + ); + self.inner + .endpoint_exts + .get_mut(parent_port) + .expect("ss") + .endpoint + .send(announcement.clone())?; + continue; // poll some more + } + } + } + }, + }; + log!(&mut self.inner.logger, "::: message {:?}...", &received); + let current_content = match received.msg { + Msg::SetupMsg(s) => { + // This occurs in the event the connector was malformed during connect() + println!("WASNT EXPECTING {:?}", s); + return Err(SyncErr::UnexpectedSetupMsg); + } + Msg::CommMsg(CommMsg { round_index, .. }) + if round_index < self.inner.round_index => + { + // Old message! Can safely discard + log!(&mut self.inner.logger, "...and its OLD! :("); + drop(received); + continue 'recv_loop; + } + Msg::CommMsg(CommMsg { round_index, .. }) + if round_index > self.inner.round_index => + { + // Message from a next round. Keep for later! + log!(&mut self.inner.logger, "... DELAY! :("); + self.delay(received); + continue 'recv_loop; + } + Msg::CommMsg(CommMsg { contents, round_index }) => { + log!( + &mut self.inner.logger, + "... its a round-appropriate CommMsg with port {:?}", + received.recipient + ); + assert_eq!(round_index, self.inner.round_index); + contents + } + }; + match current_content { + CommMsgContents::Failure => match self.inner.family.parent_port { + Some(parent_port) => { + let announcement = Msg::CommMsg(CommMsg { + round_index: self.inner.round_index, + contents: CommMsgContents::Failure, + }); + log!( + &mut self.inner.logger, + "Forwarding {:?} to parent with port {:?}", + &announcement, + parent_port + ); + self.inner + .endpoint_exts + .get_mut(parent_port) + .expect("ss") + .endpoint + .send(announcement.clone())?; + } + None => return self.end_round_with_decision(Decision::Failure), + }, + CommMsgContents::Elaborate { partial_oracle } => { + // Child controller submitted a subtree solution. + if !self.inner.family.children_ports.contains(&received.recipient) { + return Err(SyncErr::ElaborateFromNonChild); + } + let subtree_id = SubtreeId::ChildController { port: received.recipient }; + log!( + &mut self.inner.logger, + "Received elaboration from child for subtree {:?}: {:?}", + subtree_id, + &partial_oracle + ); + self.ephemeral.solution_storage.submit_and_digest_subtree_solution( + &mut self.inner.logger, + subtree_id, + partial_oracle, + ); + if self.handle_locals_maybe_decide()? { + return Ok(()); + } + } + CommMsgContents::Announce { decision } => { + if self.inner.family.parent_port != Some(received.recipient) { + return Err(SyncErr::AnnounceFromNonParent); + } + log!( + &mut self.inner.logger, + "Received ANNOUNCEMENT from from parent {:?}: {:?}", + received.recipient, + &decision + ); + return self.end_round_with_decision(decision); + } + CommMsgContents::SendPayload { payload_predicate, payload } => { + // check that we expect to be able to receive payloads from this sender + assert_eq!( + Getter, + self.inner.endpoint_exts.get(received.recipient).unwrap().info.polarity + ); + + // message for some actor. Feed it to the appropriate actor + // and then give them another chance to run. + let subtree_id = port_to_holder.get(&received.recipient); + log!( + &mut self.inner.logger, + "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", + subtree_id, + &payload_predicate, + &payload + ); + let channel_id = self + .inner + .endpoint_exts + .get(received.recipient) + .expect("UEHFU") + .info + .channel_id; + if payload_predicate.query(channel_id) != Some(true) { + // sender didn't preserve the invariant + return Err(SyncErr::PayloadPremiseExcludesTheChannel(channel_id)); + } + match subtree_id { + None => { + // this happens when a message is sent to a component that has exited. + // It's safe to drop this message; + // The sender branch will certainly not be part of the solution + } + Some(PolyId::N) => { + // Message for NativeMachine + self.ephemeral.poly_n.as_mut().unwrap().sync_recv( + received.recipient, + &mut self.inner.logger, + payload, + payload_predicate, + &mut self.ephemeral.solution_storage, + ); + if self.handle_locals_maybe_decide()? { + return Ok(()); + } + } + Some(PolyId::P { index }) => { + // Message for protocol actor + let poly_p = &mut self.ephemeral.poly_ps[*index]; + + let m_ctx = PolyPContext { + my_subtree_id: SubtreeId::PolyP { index: *index }, + inner: &mut self.inner, + solution_storage: &mut self.ephemeral.solution_storage, + }; + use SyncRunResult as Srr; + let blocker = poly_p.poly_recv_run( + m_ctx, + &self.protocol_description, + received.recipient, + payload_predicate, + payload, + )?; + log!( + &mut self.inner.logger, + "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", + subtree_id, + blocker + ); + match blocker { + Srr::NoBranches => return Err(SyncErr::Inconsistent), + Srr::BlockingForRecv | Srr::AllBranchesComplete => { + { + let peeked = self + .ephemeral + .solution_storage + .peek_new_locals() + .collect::>(); + log!( + &mut self.inner.logger, + "Got {} new controller-local solutions from RECV: {:?}", + peeked.len(), + peeked + ); + } + if self.handle_locals_maybe_decide()? { + return Ok(()); + } + } + } + } + }; + } + } + } + } +} +impl ControllerEphemeral { + fn is_clear(&self) -> bool { + self.solution_storage.is_clear() + && self.poly_n.is_none() + && self.poly_ps.is_empty() + && self.mono_ps.is_empty() + && self.port_to_holder.is_empty() + } + fn clear(&mut self) { + self.solution_storage.clear(); + self.poly_n.take(); + self.poly_ps.clear(); + self.port_to_holder.clear(); + } +} +impl Into for MonoP { + fn into(self) -> PolyP { + PolyP { + complete: Default::default(), + incomplete: hashmap! { + Predicate::new_trivial() => + BranchP { + state: self.state, + inbox: Default::default(), + outbox: Default::default(), + blocking_on: None, + } + }, + ports: self.ports, + } + } +} + +impl From for SyncErr { + fn from(e: EndpointErr) -> SyncErr { + SyncErr::EndpointErr(e) + } +} + +impl MonoContext for MonoPContext<'_> { + type D = ProtocolD; + type S = ProtocolS; + fn new_component(&mut self, moved_ports: HashSet, init_state: Self::S) { + log!( + &mut self.inner.logger, + "!! MonoContext callback to new_component with ports {:?}!", + &moved_ports, + ); + if moved_ports.is_subset(self.ports) { + self.ports.retain(|x| !moved_ports.contains(x)); + self.mono_ps.push(MonoP { state: init_state, ports: moved_ports }); + } else { + panic!("MachineP attempting to move alien port!"); + } + } + fn new_channel(&mut self) -> [PortId; 2] { + let [a, b] = Endpoint::new_memory_pair(); + let channel_id = self.inner.channel_id_stream.next(); + + let mut clos = |endpoint, polarity| { + let endpoint_ext = + EndpointExt { info: EndpointInfo { polarity, channel_id }, endpoint }; + let port = self.inner.endpoint_exts.alloc(endpoint_ext); + let endpoint = &self.inner.endpoint_exts.get(port).unwrap().endpoint; + let token = PortId::to_token(port); + self.inner + .messenger_state + .poll + .register(endpoint, token, Ready::readable(), PollOpt::edge()) + .expect("AAGAGGGGG"); + self.ports.insert(port); + port + }; + let [kp, kg] = [clos(a, Putter), clos(b, Getter)]; + log!( + &mut self.inner.logger, + "!! MonoContext callback to new_channel. returning ports {:?}!", + [kp, kg], + ); + [kp, kg] + } + fn new_random(&mut self) -> u64 { + type Bytes8 = [u8; std::mem::size_of::()]; + let mut bytes = Bytes8::default(); + getrandom::getrandom(&mut bytes).unwrap(); + let val = unsafe { std::mem::transmute::(bytes) }; + log!( + &mut self.inner.logger, + "!! MonoContext callback to new_random. returning val {:?}!", + val, + ); + val + } +} + +impl SolutionStorage { + fn is_clear(&self) -> bool { + self.subtree_id_to_index.is_empty() + && self.subtree_solutions.is_empty() + && self.old_local.is_empty() + && self.new_local.is_empty() + } + fn clear(&mut self) { + self.subtree_id_to_index.clear(); + self.subtree_solutions.clear(); + self.old_local.clear(); + self.new_local.clear(); + } + pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { + self.subtree_id_to_index.clear(); + self.subtree_solutions.clear(); + self.old_local.clear(); + self.new_local.clear(); + for key in subtree_ids { + self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); + self.subtree_solutions.push(Default::default()) + } + } + + pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { + self.new_local.iter() + } + + pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { + let Self { old_local, new_local, .. } = self; + new_local.drain().map(move |local| { + old_local.insert(local.clone()); + local + }) + } + + pub(crate) fn submit_and_digest_subtree_solution( + &mut self, + logger: &mut String, + subtree_id: SubtreeId, + predicate: Predicate, + ) { + log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); + let index = self.subtree_id_to_index[&subtree_id]; + let left = 0..index; + let right = (index + 1)..self.subtree_solutions.len(); + + let Self { subtree_solutions, new_local, old_local, .. } = self; + let was_new = subtree_solutions[index].insert(predicate.clone()); + if was_new { + let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); + Self::elaborate_into_new_local_rec( + logger, + predicate, + set_visitor, + old_local, + new_local, + ); + } + } + + fn elaborate_into_new_local_rec<'a, 'b>( + logger: &mut String, + partial: Predicate, + mut set_visitor: impl Iterator> + Clone, + old_local: &'b HashSet, + new_local: &'a mut HashSet, + ) { + if let Some(set) = set_visitor.next() { + // incomplete solution. keep traversing + for pred in set.iter() { + if let Some(elaborated) = pred.union_with(&partial) { + Self::elaborate_into_new_local_rec( + logger, + elaborated, + set_visitor.clone(), + old_local, + new_local, + ) + } + } + } else { + // recursive stop condition. `partial` is a local subtree solution + if !old_local.contains(&partial) { + // ... and it hasn't been found before + log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); + new_local.insert(partial); + } + } + } +} +impl PolyContext for BranchPContext<'_, '_> { + type D = ProtocolD; + + fn is_firing(&mut self, port: PortId) -> Option { + assert!(self.ports.contains(&port)); + let channel_id = self.m_ctx.inner.endpoint_exts.get(port).unwrap().info.channel_id; + let val = self.predicate.query(channel_id); + log!( + &mut self.m_ctx.inner.logger, + "!! PolyContext callback to is_firing by {:?}! returning {:?}", + self.m_ctx.my_subtree_id, + val, + ); + val + } + fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + assert!(self.ports.contains(&port)); + let val = self.inbox.get(&port); + log!( + &mut self.m_ctx.inner.logger, + "!! PolyContext callback to read_msg by {:?}! returning {:?}", + self.m_ctx.my_subtree_id, + val, + ); + val + } +}