diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 20d756b80a66a1c6ff7ea04e8276039df12f1443..327b5ffcdd162e0dcff5664b368d52f6c5dd9443 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,59 +1,59 @@ -// #[cfg(feature = "ffi")] -// pub mod ffi; +mod communication; +mod error; +mod setup2; -// mod actors; -// pub(crate) mod communication; -// pub(crate) mod connector; -// pub(crate) mod endpoint; -// pub mod errors; -// mod serde; +#[cfg(test)] mod my_tests; -mod setup2; -// pub(crate) mod setup; -// mod v2; use crate::common::*; -// use actors::*; -// use endpoint::*; -// use errors::*; +use error::*; +#[derive(Clone, Copy, Debug)] +pub enum LocalComponentId { + Native, + Proto { index: usize }, +} +#[derive(Debug, Clone, Copy)] +pub enum Route { + LocalComponent(LocalComponentId), + Endpoint { index: usize }, +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct MyPortInfo { + polarity: Polarity, + port: PortId, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub(crate) enum Decision { +pub enum Decision { Failure, Success(Predicate), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub(crate) enum Msg { +pub enum Msg { SetupMsg(SetupMsg), CommMsg(CommMsg), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct MyPortInfo { - polarity: Polarity, - port: PortId, -} -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub(crate) enum SetupMsg { - // sent by the passive endpoint to the active endpoint - // MyPortInfo(MyPortInfo), +pub enum SetupMsg { + MyPortInfo(MyPortInfo), LeaderEcho { maybe_leader: ControllerId }, LeaderAnnounce { leader: ControllerId }, YouAreMyParent, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub(crate) struct CommMsg { +pub struct CommMsg { pub round_index: usize, pub contents: CommMsgContents, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub(crate) enum CommMsgContents { +pub enum CommMsgContents { SendPayload { payload_predicate: Predicate, payload: Payload }, Elaborate { partial_oracle: Predicate }, // SINKWARD Failure, // SINKWARD Announce { decision: Decision }, // SINKAWAYS } #[derive(Debug, PartialEq)] -pub(crate) enum CommonSatResult { +pub enum CommonSatResult { FormerNotLatter, LatterNotFormer, Equivalent, @@ -78,19 +78,12 @@ pub struct ProtoComponent { state: ComponentState, ports: HashSet, } -#[derive(Debug)] -pub enum InpRoute { - NativeComponent, - ProtoComponent { index: usize }, - Endpoint { index: usize }, -} pub trait Logger: Debug { fn line_writer(&mut self) -> &mut dyn std::fmt::Write; fn dump_log(&self, w: &mut dyn std::io::Write); } #[derive(Debug, Clone)] pub struct EndpointSetup { - pub polarity: Polarity, pub sock_addr: SocketAddr, pub is_active: bool, } @@ -116,11 +109,17 @@ pub struct EndpointManager { // 2. Events is empty poll: Poll, events: Events, - undrained_endpoints: IndexSet, + polled_undrained: IndexSet, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, endpoint_exts: Vec, } +#[derive(Debug, Default)] +pub struct PortInfo { + polarities: HashMap, + peers: HashMap, + routes: HashMap, +} #[derive(Debug)] pub struct Connector { logger: Box, @@ -128,8 +127,7 @@ pub struct Connector { id_manager: IdManager, native_ports: HashSet, proto_components: Vec, - outp_to_inp: HashMap, - inp_to_route: HashMap, + port_info: PortInfo, phased: ConnectorPhased, } #[derive(Debug)] @@ -142,77 +140,147 @@ pub enum ConnectorPhased { endpoint_manager: EndpointManager, neighborhood: Neighborhood, mem_inbox: Vec, + native_actor: NativeActor, // sync invariant: in Nonsync state }, } #[derive(Debug)] pub struct StringLogger(ControllerId, String); -#[derive(Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -pub(crate) struct Predicate { +#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +pub struct Predicate { pub assigned: BTreeMap, } -#[derive(Debug, Default)] -struct SyncBatch { - puts: HashMap, - gets: HashSet, -} pub struct MonitoredReader { bytes: usize, r: R, } -pub enum EndpointRecvErr { - MalformedMessage, - BrokenEndpoint, -} -pub struct SyncContext<'a> { +pub struct SyncProtoContext<'a> { connector: &'a mut Connector, + proto_component_index: usize, } -pub struct NonsyncContext<'a> { +pub struct NonsyncProtoContext<'a> { connector: &'a mut Connector, + proto_component_index: usize, } -enum TryRecyAnyError { - Timeout, - PollFailed, - EndpointRecvErr { error: EndpointRecvErr, index: usize }, - BrokenEndpoint(usize), + +// pub struct MonoPContext<'a> { +// inner: &'a mut ControllerInner, +// ports: &'a mut HashSet, +// mono_ps: &'a mut Vec, +// } +// pub struct PolyPContext<'a> { +// my_subtree_id: SubtreeId, +// inner: &'a mut Connector, +// solution_storage: &'a mut SolutionStorage, +// } +// impl PolyPContext<'_> { +// #[inline(always)] +// fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { +// let Self { solution_storage, my_subtree_id, inner } = self; +// PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } +// } +// } +// struct BranchPContext<'m, 'r> { +// m_ctx: PolyPContext<'m>, +// ports: &'r HashSet, +// predicate: &'r Predicate, +// inbox: &'r HashMap, +// } + +#[derive(Default)] +pub struct SolutionStorage { + old_local: HashSet, + new_local: HashSet, + // this pair acts as SubtreeId -> HashSet which is friendlier to iteration + subtree_solutions: Vec>, + subtree_id_to_index: HashMap, +} +#[derive(Debug)] +pub enum SyncRunResult { + BlockingForRecv, + AllBranchesComplete, + NoBranches, +} +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum PolyId { + N, + P { index: usize }, +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub enum SubtreeId { + PolyN, + PolyP { index: usize }, + ChildController { port: PortId }, } +#[derive(Debug, Default)] +pub struct SyncBatch { + to_put: HashMap, + to_get: HashSet, +} +#[derive(Debug)] +pub enum NativeActor { + Nonsync { + sync_result_branch: Option, // invariant: sync_result_branch.to_get.is_empty() + next_batches: Vec, // invariant: nonempty + }, + Sync { + branches: HashMap, + }, +} +#[derive(Debug)] +pub struct NativeBranch { + batch_index: usize, + gotten: HashMap, + to_get: HashSet, +} + //////////////// impl EndpointManager { fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { self.endpoint_exts[index].endpoint.send(msg) } - fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { + fn try_recv_any( + &mut self, + logger: &mut dyn Logger, + deadline: Instant, + ) -> Result<(usize, Msg), TryRecyAnyError> { use TryRecyAnyError::*; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { return Ok(x); } - // 2. try read from sockets nonblocking - while let Some(index) = self.undrained_endpoints.pop() { - if let Some(msg) = self.endpoint_exts[index] - .endpoint - .try_recv() - .map_err(|error| EndpointRecvErr { error, index })? - { - return Ok((index, msg)); - } - } - // 3. poll for progress + loop { + // 2. try read a message from an enpoint that previously raised an event + while let Some(index) = self.polled_undrained.pop() { + let endpoint = &mut self.endpoint_exts[index].endpoint; + if let Some(msg) = + endpoint.try_recv().map_err(|error| EndpointError { error, index })? + { + if !endpoint.inbox.is_empty() { + // there may be another message waiting! + self.polled_undrained.insert(index); + } + return Ok((index, msg)); + } + } + // 3. No message yet. poll! let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; for event in self.events.iter() { + log!(logger, "Poll event {:?}", event); let Token(index) = event.token(); - if let Some(msg) = self.endpoint_exts[index] - .endpoint - .try_recv() - .map_err(|error| EndpointRecvErr { error, index })? - { - return Ok((index, msg)); - } + self.polled_undrained.insert(index); } } } fn undelay_all(&mut self) { + if self.undelayed_messages.is_empty() { + // fast path + std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages); + return; + } + // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } } @@ -221,22 +289,6 @@ impl Debug for Endpoint { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() } } -impl NonsyncContext<'_> { - pub fn new_component(&mut self, moved_ports: HashSet, init_state: ComponentState) { - todo!() - } - pub fn new_channel(&mut self) -> [PortId; 2] { - todo!() - } -} -impl SyncContext<'_> { - pub fn is_firing(&mut self, port: PortId) -> Option { - todo!() - } - pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - todo!() - } -} impl From for MonitoredReader { fn from(r: R) -> Self { Self { r, bytes: 0 } @@ -259,15 +311,6 @@ impl Into for SetupMsg { Msg::SetupMsg(self) } } -impl Debug for Predicate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.pad("{")?; - for (port, &v) in self.assigned.iter() { - f.write_fmt(format_args!("{:?}=>{}, ", port, if v { 'T' } else { 'F' }))? - } - f.pad("}") - } -} impl StringLogger { pub fn new(controller_id: ControllerId) -> Self { Self(controller_id, String::default()) @@ -308,8 +351,8 @@ impl IdManager { } } impl Endpoint { - fn try_recv(&mut self) -> Result, EndpointRecvErr> { - use EndpointRecvErr::*; + fn try_recv(&mut self) -> Result, EndpointError> { + use EndpointError::*; // populate inbox as much as possible 'read_loop: loop { match self.stream.read_to_end(&mut self.inbox) { @@ -357,518 +400,133 @@ impl Connector { writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap(); } } +impl Debug for SolutionStorage { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.pad("Solutions: [")?; + for (subtree_id, &index) in self.subtree_id_to_index.iter() { + let sols = &self.subtree_solutions[index]; + f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; + } + f.pad("]") + } +} -// #[derive(Debug)] -// pub enum Connector { -// Unconfigured(Unconfigured), -// Configured(Configured), -// Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space -// } -// #[derive(Debug)] -// pub struct Unconfigured { -// pub controller_id: ControllerId, -// } -// #[derive(Debug)] -// pub struct Configured { -// controller_id: ControllerId, -// polarities: Vec, -// bindings: HashMap, -// protocol_description: Arc, -// main_component: Vec, -// logger: String, -// } -// #[derive(Debug)] -// pub struct Connected { -// native_interface: Vec<(PortId, Polarity)>, -// sync_batches: Vec, -// // controller is cooperatively scheduled with the native application -// // (except for transport layer behind Endpoints, which are managed by the OS) -// // control flow is passed to the controller during methods on Connector (primarily, connect and sync). -// controller: Controller, -// } - -// #[derive(Debug, Copy, Clone)] -// pub enum PortBinding { -// Native, -// Active(SocketAddr), -// Passive(SocketAddr), -// } - -// #[derive(Debug)] -// struct Arena { -// storage: Vec, -// } - -// #[derive(Debug)] -// struct ReceivedMsg { -// recipient: PortId, -// msg: Msg, -// } - -// #[derive(Debug)] -// struct MessengerState { -// poll: Poll, -// events: Events, -// delayed: Vec, -// undelayed: Vec, -// polled_undrained: IndexSet, -// } -// #[derive(Debug)] -// struct ChannelIdStream { -// controller_id: ControllerId, -// next_channel_index: ChannelIndex, -// } - -// #[derive(Debug)] -// struct Controller { -// protocol_description: Arc, -// inner: ControllerInner, -// ephemeral: ControllerEphemeral, -// unrecoverable_error: Option, // prevents future calls to Sync -// } -// #[derive(Debug)] -// struct ControllerInner { -// round_index: usize, -// channel_id_stream: ChannelIdStream, -// endpoint_exts: Arena, -// messenger_state: MessengerState, -// mono_n: MonoN, // state at next round start -// mono_ps: Vec, // state at next round start -// family: ControllerFamily, -// logger: String, -// } - -// /// This structure has its state entirely reset between synchronous rounds -// #[derive(Debug, Default)] -// struct ControllerEphemeral { -// solution_storage: SolutionStorage, -// poly_n: Option, -// poly_ps: Vec, -// mono_ps: Vec, -// port_to_holder: HashMap, -// } - -// #[derive(Debug)] -// struct ControllerFamily { -// parent_port: Option, -// children_ports: Vec, -// } - -// #[derive(Debug)] -// pub(crate) enum SyncRunResult { -// BlockingForRecv, -// AllBranchesComplete, -// NoBranches, -// } - -// // Used to identify poly actors -// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -// enum PolyId { -// N, -// P { index: usize }, -// } - -// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -// pub(crate) enum SubtreeId { -// PolyN, -// PolyP { index: usize }, -// ChildController { port: PortId }, -// } - -// pub(crate) struct MonoPContext<'a> { -// inner: &'a mut ControllerInner, -// ports: &'a mut HashSet, -// mono_ps: &'a mut Vec, -// } -// pub(crate) struct PolyPContext<'a> { -// my_subtree_id: SubtreeId, -// inner: &'a mut ControllerInner, -// solution_storage: &'a mut SolutionStorage, -// } -// impl PolyPContext<'_> { -// #[inline(always)] -// fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { -// let Self { solution_storage, my_subtree_id, inner } = self; -// PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } -// } -// } -// struct BranchPContext<'m, 'r> { -// m_ctx: PolyPContext<'m>, -// ports: &'r HashSet, -// predicate: &'r Predicate, -// inbox: &'r HashMap, -// } - -// #[derive(Default)] -// pub(crate) struct SolutionStorage { -// old_local: HashSet, -// new_local: HashSet, -// // this pair acts as SubtreeId -> HashSet which is friendlier to iteration -// subtree_solutions: Vec>, -// subtree_id_to_index: HashMap, -// } - -// trait Messengerlike { -// fn get_state_mut(&mut self) -> &mut MessengerState; -// fn get_endpoint_mut(&mut self, eport: PortId) -> &mut Endpoint; - -// fn delay(&mut self, received: ReceivedMsg) { -// self.get_state_mut().delayed.push(received); -// } -// fn undelay_all(&mut self) { -// let MessengerState { delayed, undelayed, .. } = self.get_state_mut(); -// undelayed.extend(delayed.drain(..)) -// } - -// fn send(&mut self, to: PortId, msg: Msg) -> Result<(), EndpointErr> { -// self.get_endpoint_mut(to).send(msg) -// } - -// // attempt to receive a message from one of the endpoints before the deadline -// fn recv(&mut self, deadline: Instant) -> Result, MessengerRecvErr> { -// // try get something buffered -// if let Some(x) = self.get_state_mut().undelayed.pop() { -// return Ok(Some(x)); -// } - -// loop { -// // polled_undrained may not be empty -// while let Some(eport) = self.get_state_mut().polled_undrained.pop() { -// if let Some(msg) = self -// .get_endpoint_mut(eport) -// .recv() -// .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? -// { -// // this endpoint MAY still have messages! check again in future -// self.get_state_mut().polled_undrained.insert(eport); -// return Ok(Some(ReceivedMsg { recipient: eport, msg })); -// } -// } - -// let state = self.get_state_mut(); -// match state.poll_events(deadline) { -// Ok(()) => { -// for e in state.events.iter() { -// state.polled_undrained.insert(PortId::from_token(e.token())); -// } -// } -// Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), -// Err(PollDeadlineErr::Timeout) => return Ok(None), -// } -// } -// } -// fn recv_blocking(&mut self) -> Result { -// // try get something buffered -// if let Some(x) = self.get_state_mut().undelayed.pop() { -// return Ok(x); -// } - -// loop { -// // polled_undrained may not be empty -// while let Some(eport) = self.get_state_mut().polled_undrained.pop() { -// if let Some(msg) = self -// .get_endpoint_mut(eport) -// .recv() -// .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? -// { -// // this endpoint MAY still have messages! check again in future -// self.get_state_mut().polled_undrained.insert(eport); -// return Ok(ReceivedMsg { recipient: eport, msg }); -// } -// } - -// let state = self.get_state_mut(); - -// state -// .poll -// .poll(&mut state.events, None) -// .map_err(|_| MessengerRecvErr::PollingFailed)?; -// for e in state.events.iter() { -// state.polled_undrained.insert(PortId::from_token(e.token())); -// } -// } -// } -// } - -// ///////////////////////////////// -// impl Debug for SolutionStorage { -// fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { -// f.pad("Solutions: [")?; -// for (subtree_id, &index) in self.subtree_id_to_index.iter() { -// let sols = &self.subtree_solutions[index]; -// f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; -// } -// f.pad("]") -// } -// } -// impl From for SyncErr { -// fn from(e: EvalErr) -> SyncErr { -// SyncErr::EvalErr(e) -// } -// } -// impl From for SyncErr { -// fn from(e: MessengerRecvErr) -> SyncErr { -// SyncErr::MessengerRecvErr(e) -// } -// } -// impl From for ConnectErr { -// fn from(e: MessengerRecvErr) -> ConnectErr { -// ConnectErr::MessengerRecvErr(e) -// } -// } -// impl Default for Arena { -// fn default() -> Self { -// Self { storage: vec![] } -// } -// } -// impl Arena { -// pub fn alloc(&mut self, t: T) -> PortId { -// self.storage.push(t); -// let l: u32 = self.storage.len().try_into().unwrap(); -// PortId::from_raw(l - 1u32) -// } -// pub fn get(&self, key: PortId) -> Option<&T> { -// self.storage.get(key.to_raw() as usize) -// } -// pub fn get_mut(&mut self, key: PortId) -> Option<&mut T> { -// self.storage.get_mut(key.to_raw() as usize) -// } -// pub fn type_convert(self, f: impl FnMut((PortId, T)) -> X) -> Arena { -// Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() } -// } -// pub fn iter(&self) -> impl Iterator { -// self.keyspace().zip(self.storage.iter()) -// } -// pub fn len(&self) -> usize { -// self.storage.len() -// } -// pub fn keyspace(&self) -> impl Iterator { -// (0u32..self.storage.len().try_into().unwrap()).map(PortId::from_raw) -// } -// } - -// impl ChannelIdStream { -// fn new(controller_id: ControllerId) -> Self { -// Self { controller_id, next_channel_index: 0 } -// } -// fn next(&mut self) -> ChannelId { -// self.next_channel_index += 1; -// ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 } -// } -// } - -// impl MessengerState { -// // does NOT guarantee that events is non-empty -// fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { -// use PollDeadlineErr::*; -// self.events.clear(); -// let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; -// self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; -// Ok(()) -// } -// } -// impl From for ConnectErr { -// fn from(e: PollDeadlineErr) -> ConnectErr { -// match e { -// PollDeadlineErr::Timeout => ConnectErr::Timeout, -// PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed, -// } -// } -// } - -// impl std::ops::Not for Polarity { -// type Output = Self; -// fn not(self) -> Self::Output { -// use Polarity::*; -// match self { -// Putter => Getter, -// Getter => Putter, -// } -// } -// } - -// impl Predicate { -// // returns true IFF self.unify would return Equivalent OR FormerNotLatter -// pub fn satisfies(&self, other: &Self) -> bool { -// let mut s_it = self.assigned.iter(); -// let mut s = if let Some(s) = s_it.next() { -// s -// } else { -// return other.assigned.is_empty(); -// }; -// for (oid, ob) in other.assigned.iter() { -// while s.0 < oid { -// s = if let Some(s) = s_it.next() { -// s -// } else { -// return false; -// }; -// } -// if s.0 > oid || s.1 != ob { -// return false; -// } -// } -// true -// } - -// /// Given self and other, two predicates, return the most general Predicate possible, N -// /// such that n.satisfies(self) && n.satisfies(other). -// /// If none exists Nonexistant is returned. -// /// If the resulting predicate is equivlanet to self, other, or both, -// /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. -// /// otherwise New(N) is returned. -// pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { -// use CommonSatResult::*; -// // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. -// let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; -// let [mut s, mut o] = [s_it.next(), o_it.next()]; -// // lists of assignments in self but not other and vice versa. -// let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; -// loop { -// match [s, o] { -// [None, None] => break, -// [None, Some(x)] => { -// o_not_s.push(x); -// o_not_s.extend(o_it); -// break; -// } -// [Some(x), None] => { -// s_not_o.push(x); -// s_not_o.extend(s_it); -// break; -// } -// [Some((sid, sb)), Some((oid, ob))] => { -// if sid < oid { -// // o is missing this element -// s_not_o.push((sid, sb)); -// s = s_it.next(); -// } else if sid > oid { -// // s is missing this element -// o_not_s.push((oid, ob)); -// o = o_it.next(); -// } else if sb != ob { -// assert_eq!(sid, oid); -// // both predicates assign the variable but differ on the value -// return Nonexistant; -// } else { -// // both predicates assign the variable to the same value -// s = s_it.next(); -// o = o_it.next(); -// } -// } -// } -// } -// // Observed zero inconsistencies. A unified predicate exists... -// match [s_not_o.is_empty(), o_not_s.is_empty()] { -// [true, true] => Equivalent, // ... equivalent to both. -// [false, true] => FormerNotLatter, // ... equivalent to self. -// [true, false] => LatterNotFormer, // ... equivalent to other. -// [false, false] => { -// // ... which is the union of the predicates' assignments but -// // is equivalent to neither self nor other. -// let mut new = self.clone(); -// for (&id, &b) in o_not_s { -// new.assigned.insert(id, b); -// } -// New(new) -// } -// } -// } - -// pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { -// self.assigned -// .iter() -// .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) -// } - -// pub fn batch_assign_nones( -// &mut self, -// channel_ids: impl Iterator, -// value: bool, -// ) { -// for channel_id in channel_ids { -// self.assigned.entry(channel_id).or_insert(value); -// } -// } -// pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option { -// self.assigned.insert(channel_id, value) -// } -// pub fn union_with(&self, other: &Self) -> Option { -// let mut res = self.clone(); -// for (&channel_id, &assignment_1) in other.assigned.iter() { -// match res.assigned.insert(channel_id, assignment_1) { -// Some(assignment_2) if assignment_1 != assignment_2 => return None, -// _ => {} -// } -// } -// Some(res) -// } -// pub fn query(&self, x: ChannelId) -> Option { -// self.assigned.get(&x).copied() -// } -// pub fn new_trivial() -> Self { -// Self { assigned: Default::default() } -// } -// } - -// #[test] -// fn pred_sat() { -// use maplit::btreemap; -// let mut c = ChannelIdStream::new(0); -// let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); -// let p = Predicate::new_trivial(); -// let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; -// let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; -// let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; -// let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; - -// assert!(p.satisfies(&p)); -// assert!(p_0t.satisfies(&p_0t)); -// assert!(p_0f.satisfies(&p_0f)); -// assert!(p_0f_3f.satisfies(&p_0f_3f)); -// assert!(p_0f_3t.satisfies(&p_0f_3t)); - -// assert!(p_0t.satisfies(&p)); -// assert!(p_0f.satisfies(&p)); -// assert!(p_0f_3f.satisfies(&p_0f)); -// assert!(p_0f_3t.satisfies(&p_0f)); - -// assert!(!p.satisfies(&p_0t)); -// assert!(!p.satisfies(&p_0f)); -// assert!(!p_0f.satisfies(&p_0t)); -// assert!(!p_0t.satisfies(&p_0f)); -// assert!(!p_0f_3f.satisfies(&p_0f_3t)); -// assert!(!p_0f_3t.satisfies(&p_0f_3f)); -// assert!(!p_0t.satisfies(&p_0f_3f)); -// assert!(!p_0f.satisfies(&p_0f_3f)); -// assert!(!p_0t.satisfies(&p_0f_3t)); -// assert!(!p_0f.satisfies(&p_0f_3t)); -// } - -// #[test] -// fn pred_common_sat() { -// use maplit::btreemap; -// use CommonSatResult::*; - -// let mut c = ChannelIdStream::new(0); -// let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); -// let p = Predicate::new_trivial(); -// let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; -// let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; -// let p_3f = Predicate { assigned: btreemap! { ch[3] => false } }; -// let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; -// let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; - -// assert_eq![p.common_satisfier(&p), Equivalent]; -// assert_eq![p_0t.common_satisfier(&p_0t), Equivalent]; +impl Predicate { + // returns true IFF self.unify would return Equivalent OR FormerNotLatter + pub fn satisfies(&self, other: &Self) -> bool { + let mut s_it = self.assigned.iter(); + let mut s = if let Some(s) = s_it.next() { + s + } else { + return other.assigned.is_empty(); + }; + for (oid, ob) in other.assigned.iter() { + while s.0 < oid { + s = if let Some(s) = s_it.next() { + s + } else { + return false; + }; + } + if s.0 > oid || s.1 != ob { + return false; + } + } + true + } -// assert_eq![p.common_satisfier(&p_0t), LatterNotFormer]; -// assert_eq![p_0t.common_satisfier(&p), FormerNotLatter]; + /// Given self and other, two predicates, return the most general Predicate possible, N + /// such that n.satisfies(self) && n.satisfies(other). + /// If none exists Nonexistant is returned. + /// If the resulting predicate is equivlanet to self, other, or both, + /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. + /// otherwise New(N) is returned. + pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { + use CommonSatResult::*; + // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. + let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; + let [mut s, mut o] = [s_it.next(), o_it.next()]; + // lists of assignments in self but not other and vice versa. + let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; + loop { + match [s, o] { + [None, None] => break, + [None, Some(x)] => { + o_not_s.push(x); + o_not_s.extend(o_it); + break; + } + [Some(x), None] => { + s_not_o.push(x); + s_not_o.extend(s_it); + break; + } + [Some((sid, sb)), Some((oid, ob))] => { + if sid < oid { + // o is missing this element + s_not_o.push((sid, sb)); + s = s_it.next(); + } else if sid > oid { + // s is missing this element + o_not_s.push((oid, ob)); + o = o_it.next(); + } else if sb != ob { + assert_eq!(sid, oid); + // both predicates assign the variable but differ on the value + return Nonexistant; + } else { + // both predicates assign the variable to the same value + s = s_it.next(); + o = o_it.next(); + } + } + } + } + // Observed zero inconsistencies. A unified predicate exists... + match [s_not_o.is_empty(), o_not_s.is_empty()] { + [true, true] => Equivalent, // ... equivalent to both. + [false, true] => FormerNotLatter, // ... equivalent to self. + [true, false] => LatterNotFormer, // ... equivalent to other. + [false, false] => { + // ... which is the union of the predicates' assignments but + // is equivalent to neither self nor other. + let mut new = self.clone(); + for (&id, &b) in o_not_s { + new.assigned.insert(id, b); + } + New(new) + } + } + } -// assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant]; -// assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant]; -// assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant]; -// assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant]; + pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { + self.assigned + .iter() + .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) + } -// assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)]; -// } + pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator, value: bool) { + for channel_id in channel_ids { + self.assigned.entry(channel_id).or_insert(value); + } + } + pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option { + self.assigned.insert(channel_id, value) + } + pub fn union_with(&self, other: &Self) -> Option { + let mut res = self.clone(); + for (&channel_id, &assignment_1) in other.assigned.iter() { + match res.assigned.insert(channel_id, assignment_1) { + Some(assignment_2) if assignment_1 != assignment_2 => return None, + _ => {} + } + } + Some(res) + } + pub fn query(&self, x: PortId) -> Option { + self.assigned.get(&x).copied() + } + pub fn new_trivial() -> Self { + Self { assigned: Default::default() } + } +}