diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 3b949ca7953721d334e1fcc5b74545d9110f07d6..5c3fec227b398f25b91dbc615092e76437c16d76 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -1,5 +1,6 @@ use super::*; use crate::common::*; +use core::marker::PhantomData; //////////////// struct BranchingNative { @@ -10,6 +11,22 @@ struct NativeBranch { gotten: HashMap, to_get: HashSet, } +struct SolutionStorage { + old_local: HashSet, + new_local: HashSet, + // this pair acts as Route -> HashSet which is friendlier to iteration + subtree_solutions: Vec>, + subtree_id_to_index: HashMap, +} +struct BranchingProtoComponent { + ports: HashSet, + branches: HashMap, +} +#[derive(Clone)] +struct ProtoComponentBranch { + inbox: HashMap, + state: ComponentState, +} //////////////// impl NonsyncProtoContext<'_> { @@ -72,32 +89,43 @@ impl Connector { use SyncError::*; match &mut self.phased { ConnectorPhased::Setup { .. } => Err(NotConnected), - ConnectorPhased::Communication { native_batches, .. } => { + ConnectorPhased::Communication { native_batches, endpoint_manager, .. } => { // 1. run all proto components to Nonsync blockers + let mut branching_proto_components = + HashMap::::default(); let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = self.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); - while let Some((proto_component_id, mut proto_component)) = unrun_components.pop() { + while let Some((proto_component_id, mut component)) = unrun_components.pop() { + // TODO coalesce fields let mut ctx = NonsyncProtoContext { logger: &mut *self.logger, port_info: &mut self.port_info, id_manager: &mut self.id_manager, proto_component_id, unrun_components: &mut unrun_components, - proto_component_ports: &mut proto_component.ports, + proto_component_ports: &mut self + .proto_components + .get_mut(&proto_component_id) + .unwrap() + .ports, }; - match proto_component.state.nonsync_run(&mut ctx, &self.proto_description) { - NonsyncBlocker::ComponentExit => drop(proto_component), - NonsyncBlocker::Inconsistent => { + use NonsyncBlocker as B; + match component.state.nonsync_run(&mut ctx, &self.proto_description) { + B::ComponentExit => drop(component), + B::Inconsistent => { return Err(InconsistentProtoComponent(proto_component_id)) } - NonsyncBlocker::SyncBlockStart => { - self.proto_components.insert(proto_component_id, proto_component); + B::SyncBlockStart => { + branching_proto_components.insert( + proto_component_id, + BranchingProtoComponent::initial(component), + ); } } } - // all ports are GETTER - let mut mem_inbox: Vec<(PortId, SendPayloadMsg)> = vec![]; + // (Putter, ) + let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![]; // 2. kick off the native let mut branching_native = BranchingNative { branches: Default::default() }; @@ -114,9 +142,8 @@ impl Connector { } // put all messages for (port, payload) in to_put { - let getter = *self.port_info.peers.get(&port).unwrap(); - mem_inbox.push(( - getter, + payload_outbox.push(( + port, SendPayloadMsg { payload_predicate: predicate.clone(), payload }, )); } @@ -125,535 +152,219 @@ impl Connector { return Err(IndistinguishableBatches([index, existing.index])); } } + + // create the solution storage + let mut solution_storage = { + let n = std::iter::once(Route::LocalComponent(LocalComponentId::Native)); + let c = self + .proto_components + .keys() + .map(|&id| Route::LocalComponent(LocalComponentId::Proto(id))); + let e = (0..endpoint_manager.endpoint_exts.len()) + .map(|index| Route::Endpoint { index }); + SolutionStorage::new(n.chain(c).chain(e)) + }; + + // run all proto components to their sync blocker + for (proto_component_id, proto_component) in branching_proto_components.iter_mut() { + let blocked = &mut proto_component.branches; + let [unblocked_from, unblocked_to] = [ + &mut HashMap::::default(), + &mut Default::default(), + ]; + std::mem::swap(unblocked_from, blocked); + while !unblocked_from.is_empty() { + for (mut predicate, mut branch) in unblocked_from.drain() { + let mut ctx = SyncProtoContext { + logger: &mut *self.logger, + predicate: &predicate, + proto_component_id: *proto_component_id, + inbox: &branch.inbox, + }; + use SyncBlocker as B; + match branch.state.sync_run(&mut ctx, &self.proto_description) { + B::Inconsistent => { + log!(self.logger, "Proto component {:?} branch with pred {:?} became inconsistent", proto_component_id, &predicate); + // discard forever + drop((predicate, branch)); + } + B::SyncBlockEnd => { + // todo falsify + solution_storage.submit_and_digest_subtree_solution( + &mut *self.logger, + Route::LocalComponent(LocalComponentId::Proto( + *proto_component_id, + )), + predicate.clone(), + ); + // make concrete all variables + for &port in proto_component.ports.iter() { + predicate.assigned.entry(port).or_insert(false); + } + // move to "blocked" + blocked.insert(predicate, branch); + } + B::CouldntReadMsg(port) => { + // move to "blocked" + assert!(predicate.query(port).is_none()); + assert!(!branch.inbox.contains_key(&port)); + blocked.insert(predicate, branch); + } + B::CouldntCheckFiring(port) => { + // sanity check + assert!(predicate.query(port).is_none()); + let var = self.port_info.firing_var_for(port); + // keep forks in "unblocked" + unblocked_to.insert( + predicate.clone().inserted(var, false), + branch.clone(), + ); + unblocked_to.insert(predicate.inserted(var, true), branch); + } + B::PutMsg(port, payload) => { + // sanity check + assert_eq!(Some(&Putter), self.port_info.polarities.get(&port)); + // overwrite assignment + let var = self.port_info.firing_var_for(port); + let was = predicate.assigned.insert(var, true); + if was == Some(false) { + log!(self.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var); + // discard forever + drop((predicate, branch)); + } else { + // keep in "unblocked" + payload_outbox.push(( + port, + SendPayloadMsg { + payload_predicate: predicate.clone(), + payload, + }, + )); + unblocked_to.insert(predicate, branch); + } + } + } + } + std::mem::swap(unblocked_from, unblocked_to); + } + } todo!() } } } } +impl BranchingProtoComponent { + fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { + let branch = ProtoComponentBranch { inbox: Default::default(), state }; + Self { ports, branches: hashmap! { Predicate::default() => branch } } + } +} +impl SolutionStorage { + fn new(routes: impl Iterator) -> Self { + let mut subtree_id_to_index: HashMap = Default::default(); + let mut subtree_solutions = vec![]; + for key in routes { + subtree_id_to_index.insert(key, subtree_solutions.len()); + subtree_solutions.push(Default::default()) + } + Self { + subtree_solutions, + subtree_id_to_index, + old_local: Default::default(), + new_local: Default::default(), + } + } + fn is_clear(&self) -> bool { + self.subtree_id_to_index.is_empty() + && self.subtree_solutions.is_empty() + && self.old_local.is_empty() + && self.new_local.is_empty() + } + fn clear(&mut self) { + self.subtree_id_to_index.clear(); + self.subtree_solutions.clear(); + self.old_local.clear(); + self.new_local.clear(); + } + pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { + self.subtree_id_to_index.clear(); + self.subtree_solutions.clear(); + self.old_local.clear(); + self.new_local.clear(); + for key in subtree_ids { + self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); + self.subtree_solutions.push(Default::default()) + } + } -// impl Connector { -// fn end_round_with_decision(&mut self, decision: Decision) -> Result<(), SyncError> { -// log!(&mut self.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); -// let ret = match &decision { -// Decision::Success(predicate) => { -// // overwrite MonoN/P -// self.mono_n = { -// let poly_n = self.ephemeral.poly_n.take().unwrap(); -// poly_n.choose_mono(predicate).unwrap_or_else(|| { -// panic!( -// "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", -// &predicate, &poly_n.branches, &self.logger -// ); -// }) -// }; -// self.mono_ps.clear(); -// self.mono_ps.extend( -// self.ephemeral -// .poly_ps -// .drain(..) -// .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), -// ); -// Ok(()) -// } -// Decision::Failure => Err(SyncError::Timeout), -// }; -// let announcement = CommMsgContents::Announce { decision }.into_msg(self.round_index); -// for &child_port in self.family.children_ports.iter() { -// log!( -// &mut self.logger, -// "Forwarding {:?} to child with port {:?}", -// &announcement, -// child_port -// ); -// self -// .endpoint_exts -// .get_mut(child_port) -// .expect("eefef") -// .endpoint -// .send(announcement.clone())?; -// } -// self.round_index += 1; -// self.ephemeral.clear(); -// ret -// } - -// // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found -// fn handle_locals_maybe_decide(&mut self) -> Result { -// if let Some(parent_port) = self.family.parent_port { -// // I have a parent -> I'm not the leader -// let parent_endpoint = -// &mut self.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; -// for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { -// let msg = -// CommMsgContents::Elaborate { partial_oracle }.into_msg(self.round_index); -// log!(&mut self.logger, "Sending {:?} to parent {:?}", &msg, parent_port); -// parent_endpoint.send(msg)?; -// } -// Ok(false) -// } else { -// // I have no parent -> I'm the leader -// assert!(self.family.parent_port.is_none()); -// let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); -// Ok(if let Some(predicate) = maybe_predicate { -// let decision = Decision::Success(predicate); -// log!(&mut self.logger, "DECIDE ON {:?} AS LEADER!", &decision); -// self.end_round_with_decision(decision)?; -// true -// } else { -// false -// }) -// } -// } - -// fn kick_off_native( -// &mut self, -// sync_batches: impl Iterator, -// ) -> Result { -// let MonoN { ports, .. } = self.mono_n.clone(); -// let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; -// let mut branches = HashMap::<_, _>::default(); -// for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { -// let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; -// let all_ports = ports.iter().copied(); -// let all_channel_ids = all_ports.map(port_to_channel_id); - -// let mut predicate = Predicate::new_trivial(); - -// // assign TRUE for puts and gets -// let true_ports = puts.keys().chain(gets.iter()).copied(); -// let true_channel_ids = true_ports.clone().map(port_to_channel_id); -// predicate.batch_assign_nones(true_channel_ids, true); - -// // assign FALSE for all in interface not assigned true -// predicate.batch_assign_nones(all_channel_ids.clone(), false); - -// if branches.contains_key(&predicate) { -// // TODO what do I do with redundant predicates? -// unimplemented!( -// "Duplicate predicate {:#?}!\nHaving multiple batches with the same -// predicate requires the support of oracle boolean variables", -// &predicate, -// ) -// } -// let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; -// for (port, payload) in puts { -// log!( -// &mut self.logger, -// "... ... Initial native put msg {:?} pred {:?} batch {:?}", -// &payload, -// &predicate, -// sync_batch_index, -// ); -// let msg = -// CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } -// .into_msg(*round_index); -// endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; -// } -// log!( -// &mut self.logger, -// "... Initial native branch batch index={} with pred {:?}", -// sync_batch_index, -// &predicate -// ); -// if branch.to_get.is_empty() { -// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( -// &mut self.logger, -// SubtreeId::PolyN, -// predicate.clone(), -// ); -// } -// branches.insert(predicate, branch); -// } -// Ok(PolyN { ports, branches }) -// } -// pub fn sync_round( -// &mut self, -// deadline: Option, -// sync_batches: Option>, -// ) -> Result<(), SyncError> { -// if let Some(e) = self.unrecoverable_error { -// return Err(e.clone()); -// } -// self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { -// SyncError::Timeout => e, // this isn't unrecoverable -// _ => { -// // Must set unrecoverable error! and tear down our net channels -// self.unrecoverable_error = Some(e); -// self.ephemeral.clear(); -// self.endpoint_exts = Default::default(); -// e -// } -// }) -// } - -// // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. -// // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. -// fn sync_round_inner( -// &mut self, -// mut deadline: Option, -// sync_batches: Option>, -// ) -> Result<(), SyncError> { -// log!( -// &mut self.logger, -// "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", -// self.round_index -// ); -// assert!(self.ephemeral.is_clear()); -// assert!(self.unrecoverable_error.is_none()); - -// // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). -// // Some actors are dropped. some new actors are created. -// // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors -// self.ephemeral.mono_ps.extend(self.mono_ps.iter().cloned()); -// log!(&mut self.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); -// while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { -// let mut m_ctx = ProtoSyncContext { -// ports: &mut mono_p.ports, -// mono_ps: &mut self.ephemeral.mono_ps, -// inner: &mut self, -// }; -// // cross boundary into crate::protocol -// let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); -// log!(&mut self.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); -// match blocker { -// NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent), -// NonsyncBlocker::ComponentExit => drop(mono_p), -// NonsyncBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), -// } -// } -// log!( -// &mut self.logger, -// "Finished running all MonoPs! Have {} PolyPs waiting", -// self.ephemeral.poly_ps.len() -// ); - -// // 3. define the mapping from port -> actor -// // this is needed during the event loop to determine which actor -// // should receive the incoming message. -// // TODO: store and update this mapping rather than rebuilding it each round. -// let port_to_holder: HashMap = { -// use PolyId::*; -// let n = self.mono_n.ports.iter().map(move |&e| (e, N)); -// let p = self -// .ephemeral -// .poly_ps -// .iter() -// .enumerate() -// .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); -// n.chain(p).collect() -// }; -// log!( -// &mut self.logger, -// "SET OF PolyPs and MonoPs final! port lookup map is {:?}", -// &port_to_holder -// ); - -// // 4. Create the solution storage. it tracks the solutions of "subtrees" -// // of the controller in the overlay tree. -// self.ephemeral.solution_storage.reset({ -// let n = std::iter::once(SubtreeId::PolyN); -// let m = (0..self.ephemeral.poly_ps.len()).map(|index| SubtreeId::PolyP { index }); -// let c = self -// -// .family -// .children_ports -// .iter() -// .map(|&port| SubtreeId::ChildController { port }); -// let subtree_id_iter = n.chain(m).chain(c); -// log!( -// &mut self.logger, -// "Solution Storage has subtree Ids: {:?}", -// &subtree_id_iter.clone().collect::>() -// ); -// subtree_id_iter -// }); - -// // 5. kick off the synchronous round of the native actor if it exists - -// log!(&mut self.logger, "Kicking off native's synchronous round..."); -// self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { -// // using if let because of nested ? operator -// // TODO check that there are 1+ branches or NO SOLUTION -// let poly_n = self.kick_off_native(sync_batches)?; -// log!( -// &mut self.logger, -// "PolyN kicked off, and has branches with predicates... {:?}", -// poly_n.branches.keys().collect::>() -// ); -// Some(poly_n) -// } else { -// log!(&mut self.logger, "NO NATIVE COMPONENT"); -// None -// }; + pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { + self.new_local.iter() + } -// // 6. Kick off the synchronous round of each protocol actor -// // If just one actor becomes inconsistent now, there can be no solution! -// // TODO distinguish between completed and not completed poly_p's? -// log!(&mut self.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); -// for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { -// let my_subtree_id = SubtreeId::PolyP { index }; -// let m_ctx = PolyPContext { -// my_subtree_id, -// inner: &mut self, -// solution_storage: &mut self.ephemeral.solution_storage, -// }; -// use SyncRunResult as Srr; -// let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; -// log!(&mut self.logger, "... PolyP's poly_run got blocker {:?}", &blocker); -// match blocker { -// Srr::NoBranches => return Err(SyncError::Inconsistent), -// Srr::AllBranchesComplete | Srr::BlockingForRecv => (), -// } -// } -// log!(&mut self.logger, "All Poly machines have been kicked off!"); + pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { + let Self { old_local, new_local, .. } = self; + new_local.drain().map(move |local| { + old_local.insert(local.clone()); + local + }) + } -// // 7. `solution_storage` may have new solutions for this controller -// // handle their discovery. LEADER => announce, otherwise => send to parent -// { -// let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); -// log!( -// &mut self.logger, -// "Got {} controller-local solutions before a single RECV: {:?}", -// peeked.len(), -// peeked -// ); -// } -// if self.handle_locals_maybe_decide()? { -// return Ok(()); -// } + pub(crate) fn submit_and_digest_subtree_solution( + &mut self, + logger: &mut dyn Logger, + subtree_id: Route, + predicate: Predicate, + ) { + log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); + let index = self.subtree_id_to_index[&subtree_id]; + let left = 0..index; + let right = (index + 1)..self.subtree_solutions.len(); + + let Self { subtree_solutions, new_local, old_local, .. } = self; + let was_new = subtree_solutions[index].insert(predicate.clone()); + if was_new { + let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); + Self::elaborate_into_new_local_rec( + logger, + predicate, + set_visitor, + old_local, + new_local, + ); + } + } -// // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error -// log!(&mut self.logger, "`No decision yet`. Time to recv messages"); -// self.undelay_all(); -// 'recv_loop: loop { -// log!(&mut self.logger, "`POLLING` with deadline {:?}...", deadline); -// let received = match deadline { -// None => { -// // we have personally timed out. perform a "long" poll. -// self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") -// } -// Some(d) => match self.recv(d)? { -// // we have not yet timed out. performed a time-limited poll -// Some(received) => received, -// None => { -// // timed out! send a FAILURE message to the sink, -// // and henceforth don't time out on polling. -// deadline = None; -// match self.family.parent_port { -// None => { -// // I am the sink! announce failure and return. -// return self.end_round_with_decision(Decision::Failure); -// } -// Some(parent_port) => { -// // I am not the sink! send a failure message. -// let announcement = Msg::CommMsg(CommMsg { -// round_index: self.round_index, -// contents: CommMsgContents::Failure, -// }); -// log!( -// &mut self.logger, -// "Forwarding {:?} to parent with port {:?}", -// &announcement, -// parent_port -// ); -// self -// .endpoint_exts -// .get_mut(parent_port) -// .expect("ss") -// .endpoint -// .send(announcement.clone())?; -// continue; // poll some more -// } -// } -// } -// }, -// }; -// log!(&mut self.logger, "::: message {:?}...", &received); -// let current_content = match received.msg { -// Msg::SetupMsg(s) => { -// // This occurs in the event the connector was malformed during connect() -// println!("WASNT EXPECTING {:?}", s); -// return Err(SyncError::UnexpectedSetupMsg); -// } -// Msg::CommMsg(CommMsg { round_index, .. }) -// if round_index < self.round_index => -// { -// // Old message! Can safely discard -// log!(&mut self.logger, "...and its OLD! :("); -// drop(received); -// continue 'recv_loop; -// } -// Msg::CommMsg(CommMsg { round_index, .. }) -// if round_index > self.round_index => -// { -// // Message from a next round. Keep for later! -// log!(&mut self.logger, "... DELAY! :("); -// self.delay(received); -// continue 'recv_loop; -// } -// Msg::CommMsg(CommMsg { contents, round_index }) => { -// log!( -// &mut self.logger, -// "... its a round-appropriate CommMsg with port {:?}", -// received.recipient -// ); -// assert_eq!(round_index, self.round_index); -// contents -// } -// }; -// match current_content { -// CommMsgContents::Failure => match self.family.parent_port { -// Some(parent_port) => { -// let announcement = Msg::CommMsg(CommMsg { -// round_index: self.round_index, -// contents: CommMsgContents::Failure, -// }); -// log!( -// &mut self.logger, -// "Forwarding {:?} to parent with port {:?}", -// &announcement, -// parent_port -// ); -// self -// .endpoint_exts -// .get_mut(parent_port) -// .expect("ss") -// .endpoint -// .send(announcement.clone())?; -// } -// None => return self.end_round_with_decision(Decision::Failure), -// }, -// CommMsgContents::Elaborate { partial_oracle } => { -// // Child controller submitted a subtree solution. -// if !self.family.children_ports.contains(&received.recipient) { -// return Err(SyncError::ElaborateFromNonChild); -// } -// let subtree_id = SubtreeId::ChildController { port: received.recipient }; -// log!( -// &mut self.logger, -// "Received elaboration from child for subtree {:?}: {:?}", -// subtree_id, -// &partial_oracle -// ); -// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( -// &mut self.logger, -// subtree_id, -// partial_oracle, -// ); -// if self.handle_locals_maybe_decide()? { -// return Ok(()); -// } -// } -// CommMsgContents::Announce { decision } => { -// if self.family.parent_port != Some(received.recipient) { -// return Err(SyncError::AnnounceFromNonParent); -// } -// log!( -// &mut self.logger, -// "Received ANNOUNCEMENT from from parent {:?}: {:?}", -// received.recipient, -// &decision -// ); -// return self.end_round_with_decision(decision); -// } -// CommMsgContents::SendPayload { payload_predicate, payload } => { -// // check that we expect to be able to receive payloads from this sender -// assert_eq!( -// Getter, -// self.endpoint_exts.get(received.recipient).unwrap().info.polarity -// ); + fn elaborate_into_new_local_rec<'a, 'b>( + logger: &mut dyn Logger, + partial: Predicate, + mut set_visitor: impl Iterator> + Clone, + old_local: &'b HashSet, + new_local: &'a mut HashSet, + ) { + if let Some(set) = set_visitor.next() { + // incomplete solution. keep traversing + for pred in set.iter() { + if let Some(elaborated) = pred.union_with(&partial) { + Self::elaborate_into_new_local_rec( + logger, + elaborated, + set_visitor.clone(), + old_local, + new_local, + ) + } + } + } else { + // recursive stop condition. `partial` is a local subtree solution + if !old_local.contains(&partial) { + // ... and it hasn't been found before + log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); + new_local.insert(partial); + } + } + } +} -// // message for some actor. Feed it to the appropriate actor -// // and then give them another chance to run. -// let subtree_id = port_to_holder.get(&received.recipient); -// log!( -// &mut self.logger, -// "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", -// subtree_id, -// &payload_predicate, -// &payload -// ); -// let channel_id = self -// -// .endpoint_exts -// .get(received.recipient) -// .expect("UEHFU") -// .info -// .channel_id; -// if payload_predicate.query(channel_id) != Some(true) { -// // sender didn't preserve the invariant -// return Err(SyncError::PayloadPremiseExcludesTheChannel(channel_id)); -// } -// match subtree_id { -// None => { -// // this happens when a message is sent to a component that has exited. -// // It's safe to drop this message; -// // The sender branch will certainly not be part of the solution -// } -// Some(PolyId::N) => { -// // Message for NativeMachine -// self.ephemeral.poly_n.as_mut().unwrap().sync_recv( -// received.recipient, -// &mut self.logger, -// payload, -// payload_predicate, -// &mut self.ephemeral.solution_storage, -// ); -// if self.handle_locals_maybe_decide()? { -// return Ok(()); -// } -// } -// Some(PolyId::P { index }) => { -// // Message for protocol actor -// let poly_p = &mut self.ephemeral.poly_ps[*index]; - -// let m_ctx = PolyPContext { -// my_subtree_id: SubtreeId::PolyP { index: *index }, -// inner: &mut self, -// solution_storage: &mut self.ephemeral.solution_storage, -// }; -// use SyncRunResult as Srr; -// let blocker = poly_p.poly_recv_run( -// m_ctx, -// &self.protocol_description, -// received.recipient, -// payload_predicate, -// payload, -// )?; -// log!( -// &mut self.logger, -// "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", -// subtree_id, -// blocker -// ); -// match blocker { -// Srr::NoBranches => return Err(SyncError::Inconsistent), -// Srr::BlockingForRecv | Srr::AllBranchesComplete => { -// { -// let peeked = self -// .ephemeral -// .solution_storage -// .peek_new_locals() -// .collect::>(); -// log!( -// &mut self.logger, -// "Got {} new controller-local solutions from RECV: {:?}", -// peeked.len(), -// peeked -// ); -// } -// if self.handle_locals_maybe_decide()? { -// return Ok(()); -// } -// } -// } -// } -// }; -// } -// } -// } -// } -// } // impl ControllerEphemeral { // fn is_clear(&self) -> bool { // self.solution_storage.is_clear() @@ -702,97 +413,6 @@ impl Connector { // } // } -// impl SolutionStorage { -// fn is_clear(&self) -> bool { -// self.subtree_id_to_index.is_empty() -// && self.subtree_solutions.is_empty() -// && self.old_local.is_empty() -// && self.new_local.is_empty() -// } -// fn clear(&mut self) { -// self.subtree_id_to_index.clear(); -// self.subtree_solutions.clear(); -// self.old_local.clear(); -// self.new_local.clear(); -// } -// pub(crate) fn reset(&mut self, subtree_ids: impl Iterator) { -// self.subtree_id_to_index.clear(); -// self.subtree_solutions.clear(); -// self.old_local.clear(); -// self.new_local.clear(); -// for key in subtree_ids { -// self.subtree_id_to_index.insert(key, self.subtree_solutions.len()); -// self.subtree_solutions.push(Default::default()) -// } -// } - -// pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { -// self.new_local.iter() -// } - -// pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { -// let Self { old_local, new_local, .. } = self; -// new_local.drain().map(move |local| { -// old_local.insert(local.clone()); -// local -// }) -// } - -// pub(crate) fn submit_and_digest_subtree_solution( -// &mut self, -// logger: &mut String, -// subtree_id: SubtreeId, -// predicate: Predicate, -// ) { -// log!(logger, "NEW COMPONENT SOLUTION {:?} {:?}", subtree_id, &predicate); -// let index = self.subtree_id_to_index[&subtree_id]; -// let left = 0..index; -// let right = (index + 1)..self.subtree_solutions.len(); - -// let Self { subtree_solutions, new_local, old_local, .. } = self; -// let was_new = subtree_solutions[index].insert(predicate.clone()); -// if was_new { -// let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); -// Self::elaborate_into_new_local_rec( -// logger, -// predicate, -// set_visitor, -// old_local, -// new_local, -// ); -// } -// } - -// fn elaborate_into_new_local_rec<'a, 'b>( -// logger: &mut String, -// partial: Predicate, -// mut set_visitor: impl Iterator> + Clone, -// old_local: &'b HashSet, -// new_local: &'a mut HashSet, -// ) { -// if let Some(set) = set_visitor.next() { -// // incomplete solution. keep traversing -// for pred in set.iter() { -// if let Some(elaborated) = pred.union_with(&partial) { -// Self::elaborate_into_new_local_rec( -// logger, -// elaborated, -// set_visitor.clone(), -// old_local, -// new_local, -// ) -// } -// } -// } else { -// // recursive stop condition. `partial` is a local subtree solution -// if !old_local.contains(&partial) { -// // ... and it hasn't been found before -// log!(logger, "... storing NEW LOCAL SOLUTION {:?}", &partial); -// new_local.insert(partial); -// } -// } -// } -// } // impl PolyContext for BranchPContext<'_, '_> { // type D = ProtocolD; @@ -820,3 +440,507 @@ impl Connector { // val // } // } + +////////////// + +// impl Connector { +// fn end_round_with_decision(&mut self, decision: Decision) -> Result { +// log!(&mut self.logger, "ENDING ROUND WITH DECISION! {:?}", &decision); +// let ret = match &decision { +// Decision::Success(predicate) => { +// // overwrite MonoN/P +// self.mono_n = { +// let poly_n = self.ephemeral.poly_n.take().unwrap(); +// poly_n.choose_mono(predicate).unwrap_or_else(|| { +// panic!( +// "Ending round with decision pred {:#?} but poly_n has branches {:#?}. My log is... {}", +// &predicate, &poly_n.branches, &self.logger +// ); +// }) +// }; +// self.mono_ps.clear(); +// self.mono_ps.extend( +// self.ephemeral +// .poly_ps +// .drain(..) +// .map(|poly_p| poly_p.choose_mono(predicate).unwrap()), +// ); +// Ok(()) +// } +// Decision::Failure => Err(SyncError::Timeout), +// }; +// let announcement = CommMsgContents::Announce { decision }.into_msg(self.round_index); +// for &child_port in self.family.children_ports.iter() { +// log!( +// &mut self.logger, +// "Forwarding {:?} to child with port {:?}", +// &announcement, +// child_port +// ); +// self.endpoint_exts +// .get_mut(child_port) +// .expect("eefef") +// .endpoint +// .send(announcement.clone())?; +// } +// self.round_index += 1; +// self.ephemeral.clear(); +// ret +// } + +// // Drain self.ephemeral.solution_storage and handle the new locals. Return decision if one is found +// fn handle_locals_maybe_decide(&mut self) -> Result { +// if let Some(parent_port) = self.family.parent_port { +// // I have a parent -> I'm not the leader +// let parent_endpoint = +// &mut self.endpoint_exts.get_mut(parent_port).expect("huu").endpoint; +// for partial_oracle in self.ephemeral.solution_storage.iter_new_local_make_old() { +// let msg = CommMsgContents::Elaborate { partial_oracle }.into_msg(self.round_index); +// log!(&mut self.logger, "Sending {:?} to parent {:?}", &msg, parent_port); +// parent_endpoint.send(msg)?; +// } +// Ok(false) +// } else { +// // I have no parent -> I'm the leader +// assert!(self.family.parent_port.is_none()); +// let maybe_predicate = self.ephemeral.solution_storage.iter_new_local_make_old().next(); +// Ok(if let Some(predicate) = maybe_predicate { +// let decision = Decision::Success(predicate); +// log!(&mut self.logger, "DECIDE ON {:?} AS LEADER!", &decision); +// self.end_round_with_decision(decision)?; +// true +// } else { +// false +// }) +// } +// } + +// fn kick_off_native( +// &mut self, +// sync_batches: impl Iterator, +// ) -> Result { +// let MonoN { ports, .. } = self.mono_n.clone(); +// let Self { inner: ControllerInner { endpoint_exts, round_index, .. }, .. } = self; +// let mut branches = HashMap::<_, _>::default(); +// for (sync_batch_index, SyncBatch { puts, gets }) in sync_batches.enumerate() { +// let port_to_channel_id = |port| endpoint_exts.get(port).unwrap().info.channel_id; +// let all_ports = ports.iter().copied(); +// let all_channel_ids = all_ports.map(port_to_channel_id); + +// let mut predicate = Predicate::new_trivial(); + +// // assign TRUE for puts and gets +// let true_ports = puts.keys().chain(gets.iter()).copied(); +// let true_channel_ids = true_ports.clone().map(port_to_channel_id); +// predicate.batch_assign_nones(true_channel_ids, true); + +// // assign FALSE for all in interface not assigned true +// predicate.batch_assign_nones(all_channel_ids.clone(), false); + +// if branches.contains_key(&predicate) { +// // TODO what do I do with redundant predicates? +// unimplemented!( +// "Duplicate predicate {:#?}!\nHaving multiple batches with the same +// predicate requires the support of oracle boolean variables", +// &predicate, +// ) +// } +// let branch = BranchN { to_get: gets, gotten: Default::default(), sync_batch_index }; +// for (port, payload) in puts { +// log!( +// &mut self.logger, +// "... ... Initial native put msg {:?} pred {:?} batch {:?}", +// &payload, +// &predicate, +// sync_batch_index, +// ); +// let msg = +// CommMsgContents::SendPayload { payload_predicate: predicate.clone(), payload } +// .into_msg(*round_index); +// endpoint_exts.get_mut(port).unwrap().endpoint.send(msg)?; +// } +// log!( +// &mut self.logger, +// "... Initial native branch batch index={} with pred {:?}", +// sync_batch_index, +// &predicate +// ); +// if branch.to_get.is_empty() { +// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( +// &mut self.logger, +// Route::PolyN, +// predicate.clone(), +// ); +// } +// branches.insert(predicate, branch); +// } +// Ok(PolyN { ports, branches }) +// } +// pub fn sync_round( +// &mut self, +// deadline: Option, +// sync_batches: Option>, +// ) -> Result<(), SyncError> { +// if let Some(e) = self.unrecoverable_error { +// return Err(e.clone()); +// } +// self.sync_round_inner(deadline, sync_batches).map_err(move |e| match e { +// SyncError::Timeout => e, // this isn't unrecoverable +// _ => { +// // Must set unrecoverable error! and tear down our net channels +// self.unrecoverable_error = Some(e); +// self.ephemeral.clear(); +// self.endpoint_exts = Default::default(); +// e +// } +// }) +// } + +// // Runs a synchronous round until all the actors are in decided state OR 1+ are inconsistent. +// // If a native requires setting up, arg `sync_batches` is Some, and those are used as the sync batches. +// fn sync_round_inner( +// &mut self, +// mut deadline: Option, +// sync_batches: Option>, +// ) -> Result<(), SyncError> { +// log!(&mut self.logger, "~~~~~~~~ SYNC ROUND STARTS! ROUND={} ~~~~~~~~~", self.round_index); +// assert!(self.ephemeral.is_clear()); +// assert!(self.unrecoverable_error.is_none()); + +// // 1. Run the Mono for each Mono actor (stored in `self.mono_ps`). +// // Some actors are dropped. some new actors are created. +// // Ultimately, we have 0 Mono actors and a list of unnamed sync_actors +// self.ephemeral.mono_ps.extend(self.mono_ps.iter().cloned()); +// log!(&mut self.logger, "Got {} MonoP's to run!", self.ephemeral.mono_ps.len()); +// while let Some(mut mono_p) = self.ephemeral.mono_ps.pop() { +// let mut m_ctx = ProtoSyncContext { +// ports: &mut mono_p.ports, +// mono_ps: &mut self.ephemeral.mono_ps, +// inner: &mut self, +// }; +// // cross boundary into crate::protocol +// let blocker = mono_p.state.pre_sync_run(&mut m_ctx, &self.protocol_description); +// log!(&mut self.logger, "... MonoP's pre_sync_run got blocker {:?}", &blocker); +// match blocker { +// NonsyncBlocker::Inconsistent => return Err(SyncError::Inconsistent), +// NonsyncBlocker::ComponentExit => drop(mono_p), +// NonsyncBlocker::SyncBlockStart => self.ephemeral.poly_ps.push(mono_p.into()), +// } +// } +// log!( +// &mut self.logger, +// "Finished running all MonoPs! Have {} PolyPs waiting", +// self.ephemeral.poly_ps.len() +// ); + +// // 3. define the mapping from port -> actor +// // this is needed during the event loop to determine which actor +// // should receive the incoming message. +// // TODO: store and update this mapping rather than rebuilding it each round. +// let port_to_holder: HashMap = { +// use PolyId::*; +// let n = self.mono_n.ports.iter().map(move |&e| (e, N)); +// let p = self +// .ephemeral +// .poly_ps +// .iter() +// .enumerate() +// .flat_map(|(index, m)| m.ports.iter().map(move |&e| (e, P { index }))); +// n.chain(p).collect() +// }; +// log!( +// &mut self.logger, +// "SET OF PolyPs and MonoPs final! port lookup map is {:?}", +// &port_to_holder +// ); + +// // 4. Create the solution storage. it tracks the solutions of "subtrees" +// // of the controller in the overlay tree. +// self.ephemeral.solution_storage.reset({ +// let n = std::iter::once(Route::PolyN); +// let m = (0..self.ephemeral.poly_ps.len()).map(|index| Route::PolyP { index }); +// let c = self.family.children_ports.iter().map(|&port| Route::ChildController { port }); +// let subtree_id_iter = n.chain(m).chain(c); +// log!( +// &mut self.logger, +// "Solution Storage has subtree Ids: {:?}", +// &subtree_id_iter.clone().collect::>() +// ); +// subtree_id_iter +// }); + +// // 5. kick off the synchronous round of the native actor if it exists + +// log!(&mut self.logger, "Kicking off native's synchronous round..."); +// self.ephemeral.poly_n = if let Some(sync_batches) = sync_batches { +// // using if let because of nested ? operator +// // TODO check that there are 1+ branches or NO SOLUTION +// let poly_n = self.kick_off_native(sync_batches)?; +// log!( +// &mut self.logger, +// "PolyN kicked off, and has branches with predicates... {:?}", +// poly_n.branches.keys().collect::>() +// ); +// Some(poly_n) +// } else { +// log!(&mut self.logger, "NO NATIVE COMPONENT"); +// None +// }; + +// // 6. Kick off the synchronous round of each protocol actor +// // If just one actor becomes inconsistent now, there can be no solution! +// // TODO distinguish between completed and not completed poly_p's? +// log!(&mut self.logger, "Kicking off {} PolyP's.", self.ephemeral.poly_ps.len()); +// for (index, poly_p) in self.ephemeral.poly_ps.iter_mut().enumerate() { +// let my_subtree_id = Route::PolyP { index }; +// let m_ctx = PolyPContext { +// my_subtree_id, +// inner: &mut self, +// solution_storage: &mut self.ephemeral.solution_storage, +// }; +// use SyncRunResult as Srr; +// let blocker = poly_p.poly_run(m_ctx, &self.protocol_description)?; +// log!(&mut self.logger, "... PolyP's poly_run got blocker {:?}", &blocker); +// match blocker { +// Srr::NoBranches => return Err(SyncError::Inconsistent), +// Srr::AllBranchesComplete | Srr::BlockingForRecv => (), +// } +// } +// log!(&mut self.logger, "All Poly machines have been kicked off!"); + +// // 7. `solution_storage` may have new solutions for this controller +// // handle their discovery. LEADER => announce, otherwise => send to parent +// { +// let peeked = self.ephemeral.solution_storage.peek_new_locals().collect::>(); +// log!( +// &mut self.logger, +// "Got {} controller-local solutions before a single RECV: {:?}", +// peeked.len(), +// peeked +// ); +// } +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } + +// // 4. Receive incoming messages until the DECISION is made OR some unrecoverable error +// log!(&mut self.logger, "`No decision yet`. Time to recv messages"); +// self.undelay_all(); +// 'recv_loop: loop { +// log!(&mut self.logger, "`POLLING` with deadline {:?}...", deadline); +// let received = match deadline { +// None => { +// // we have personally timed out. perform a "long" poll. +// self.recv(Instant::now() + Duration::from_secs(10))?.expect("DRIED UP") +// } +// Some(d) => match self.recv(d)? { +// // we have not yet timed out. performed a time-limited poll +// Some(received) => received, +// None => { +// // timed out! send a FAILURE message to the sink, +// // and henceforth don't time out on polling. +// deadline = None; +// match self.family.parent_port { +// None => { +// // I am the sink! announce failure and return. +// return self.end_round_with_decision(Decision::Failure); +// } +// Some(parent_port) => { +// // I am not the sink! send a failure message. +// let announcement = Msg::CommMsg(CommMsg { +// round_index: self.round_index, +// contents: CommMsgContents::Failure, +// }); +// log!( +// &mut self.logger, +// "Forwarding {:?} to parent with port {:?}", +// &announcement, +// parent_port +// ); +// self.endpoint_exts +// .get_mut(parent_port) +// .expect("ss") +// .endpoint +// .send(announcement.clone())?; +// continue; // poll some more +// } +// } +// } +// }, +// }; +// log!(&mut self.logger, "::: message {:?}...", &received); +// let current_content = match received.msg { +// Msg::SetupMsg(s) => { +// // This occurs in the event the connector was malformed during connect() +// println!("WASNT EXPECTING {:?}", s); +// return Err(SyncError::UnexpectedSetupMsg); +// } +// Msg::CommMsg(CommMsg { round_index, .. }) if round_index < self.round_index => { +// // Old message! Can safely discard +// log!(&mut self.logger, "...and its OLD! :("); +// drop(received); +// continue 'recv_loop; +// } +// Msg::CommMsg(CommMsg { round_index, .. }) if round_index > self.round_index => { +// // Message from a next round. Keep for later! +// log!(&mut self.logger, "... DELAY! :("); +// self.delay(received); +// continue 'recv_loop; +// } +// Msg::CommMsg(CommMsg { contents, round_index }) => { +// log!( +// &mut self.logger, +// "... its a round-appropriate CommMsg with port {:?}", +// received.recipient +// ); +// assert_eq!(round_index, self.round_index); +// contents +// } +// }; +// match current_content { +// CommMsgContents::Failure => match self.family.parent_port { +// Some(parent_port) => { +// let announcement = Msg::CommMsg(CommMsg { +// round_index: self.round_index, +// contents: CommMsgContents::Failure, +// }); +// log!( +// &mut self.logger, +// "Forwarding {:?} to parent with port {:?}", +// &announcement, +// parent_port +// ); +// self.endpoint_exts +// .get_mut(parent_port) +// .expect("ss") +// .endpoint +// .send(announcement.clone())?; +// } +// None => return self.end_round_with_decision(Decision::Failure), +// }, +// CommMsgContents::Elaborate { partial_oracle } => { +// // Child controller submitted a subtree solution. +// if !self.family.children_ports.contains(&received.recipient) { +// return Err(SyncError::ElaborateFromNonChild); +// } +// let subtree_id = Route::ChildController { port: received.recipient }; +// log!( +// &mut self.logger, +// "Received elaboration from child for subtree {:?}: {:?}", +// subtree_id, +// &partial_oracle +// ); +// self.ephemeral.solution_storage.submit_and_digest_subtree_solution( +// &mut self.logger, +// subtree_id, +// partial_oracle, +// ); +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } +// } +// CommMsgContents::Announce { decision } => { +// if self.family.parent_port != Some(received.recipient) { +// return Err(SyncError::AnnounceFromNonParent); +// } +// log!( +// &mut self.logger, +// "Received ANNOUNCEMENT from from parent {:?}: {:?}", +// received.recipient, +// &decision +// ); +// return self.end_round_with_decision(decision); +// } +// CommMsgContents::SendPayload { payload_predicate, payload } => { +// // check that we expect to be able to receive payloads from this sender +// assert_eq!( +// Getter, +// self.endpoint_exts.get(received.recipient).unwrap().info.polarity +// ); + +// // message for some actor. Feed it to the appropriate actor +// // and then give them another chance to run. +// let subtree_id = port_to_holder.get(&received.recipient); +// log!( +// &mut self.logger, +// "Received SendPayload for subtree {:?} with pred {:?} and payload {:?}", +// subtree_id, +// &payload_predicate, +// &payload +// ); +// let channel_id = +// self.endpoint_exts.get(received.recipient).expect("UEHFU").info.channel_id; +// if payload_predicate.query(channel_id) != Some(true) { +// // sender didn't preserve the invariant +// return Err(SyncError::PayloadPremiseExcludesTheChannel(channel_id)); +// } +// match subtree_id { +// None => { +// // this happens when a message is sent to a component that has exited. +// // It's safe to drop this message; +// // The sender branch will certainly not be part of the solution +// } +// Some(PolyId::N) => { +// // Message for NativeMachine +// self.ephemeral.poly_n.as_mut().unwrap().sync_recv( +// received.recipient, +// &mut self.logger, +// payload, +// payload_predicate, +// &mut self.ephemeral.solution_storage, +// ); +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } +// } +// Some(PolyId::P { index }) => { +// // Message for protocol actor +// let poly_p = &mut self.ephemeral.poly_ps[*index]; + +// let m_ctx = PolyPContext { +// my_subtree_id: Route::PolyP { index: *index }, +// inner: &mut self, +// solution_storage: &mut self.ephemeral.solution_storage, +// }; +// use SyncRunResult as Srr; +// let blocker = poly_p.poly_recv_run( +// m_ctx, +// &self.protocol_description, +// received.recipient, +// payload_predicate, +// payload, +// )?; +// log!( +// &mut self.logger, +// "... Fed the msg to PolyP {:?} and ran it to blocker {:?}", +// subtree_id, +// blocker +// ); +// match blocker { +// Srr::NoBranches => return Err(SyncError::Inconsistent), +// Srr::BlockingForRecv | Srr::AllBranchesComplete => { +// { +// let peeked = self +// .ephemeral +// .solution_storage +// .peek_new_locals() +// .collect::>(); +// log!( +// &mut self.logger, +// "Got {} new controller-local solutions from RECV: {:?}", +// peeked.len(), +// peeked +// ); +// } +// if self.handle_locals_maybe_decide()? { +// return Ok(()); +// } +// } +// } +// } +// }; +// } +// } +// } +// } +// } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 15b2bc12c0cb13ac0c1190ea5102044f13fbe5b9..07bf1cb62b5e520d2d9356c9bc475e1cc4f7d19b 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -8,12 +8,12 @@ mod my_tests; use crate::common::*; use error::*; -#[derive(Clone, Copy, Debug)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub enum LocalComponentId { Native, Proto(ProtoComponentId), } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub enum Route { LocalComponent(LocalComponentId), Endpoint { index: usize }, @@ -172,9 +172,10 @@ pub struct NonsyncProtoContext<'a> { unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, } pub struct SyncProtoContext<'a> { + logger: &'a mut dyn Logger, predicate: &'a Predicate, proto_component_id: ProtoComponentId, - inbox: HashMap, + inbox: &'a HashMap, } // pub struct MonoPContext<'a> { @@ -201,14 +202,6 @@ pub struct SyncProtoContext<'a> { // inbox: &'r HashMap, // } -// #[derive(Default)] -// pub struct SolutionStorage { -// old_local: HashSet, -// new_local: HashSet, -// // this pair acts as SubtreeId -> HashSet which is friendlier to iteration -// subtree_solutions: Vec>, -// subtree_id_to_index: HashMap, -// } // #[derive(Debug)] // pub enum SyncRunResult { // BlockingForRecv, @@ -432,6 +425,11 @@ impl Connector { // } impl Predicate { + #[inline] + pub fn inserted(mut self, k: PortId, v: bool) -> Self { + self.assigned.insert(k, v); + self + } // returns true IFF self.unify would return Equivalent OR FormerNotLatter pub fn satisfies(&self, other: &Self) -> bool { let mut s_it = self.assigned.iter(); @@ -530,9 +528,9 @@ impl Predicate { self.assigned.entry(channel_id).or_insert(value); } } - pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option { - self.assigned.insert(channel_id, value) - } + // pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option { + // self.assigned.insert(channel_id, value) + // } pub fn union_with(&self, other: &Self) -> Option { let mut res = self.clone(); for (&channel_id, &assignment_1) in other.assigned.iter() {