#[cfg(feature = "ffi")] pub mod ffi; mod actors; pub(crate) mod communication; pub(crate) mod connector; pub(crate) mod endpoint; pub mod errors; // pub mod experimental; mod serde; pub(crate) mod setup; pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl; pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl; use crate::common::*; use actors::*; use endpoint::*; use errors::*; #[derive(Debug, PartialEq)] pub(crate) enum CommonSatResult { FormerNotLatter, LatterNotFormer, Equivalent, New(Predicate), Nonexistant, } #[derive(Clone, Eq, PartialEq, Hash)] pub(crate) struct Predicate { pub assigned: BTreeMap, } #[derive(Debug, Default)] struct SyncBatch { puts: HashMap, gets: HashSet, } #[derive(Debug)] pub enum Connector { Unconfigured(Unconfigured), Configured(Configured), Connected(Connected), // TODO consider boxing. currently takes up a lot of stack real estate } #[derive(Debug)] pub struct Unconfigured { pub controller_id: ControllerId, } #[derive(Debug)] pub struct Configured { controller_id: ControllerId, polarities: Vec, bindings: HashMap, protocol_description: Arc, main_component: Vec, } #[derive(Debug)] pub struct Connected { native_interface: Vec<(Key, Polarity)>, sync_batches: Vec, controller: Controller, } #[derive(Debug, Copy, Clone)] pub enum PortBinding { Native, Active(SocketAddr), Passive(SocketAddr), } #[derive(Debug)] struct Arena { storage: Vec, } #[derive(Debug)] struct ReceivedMsg { recipient: Key, msg: Msg, } #[derive(Debug)] struct MessengerState { poll: Poll, events: Events, delayed: Vec, undelayed: Vec, polled_undrained: IndexSet, } #[derive(Debug)] struct ChannelIdStream { controller_id: ControllerId, next_channel_index: ChannelIndex, } #[derive(Debug)] enum RoundHistory { Consistent(Predicate, (MonoN, PolyN), Vec<(MonoP, PolyP)>), Inconsistent(SolutionStorage, PolyN, Vec), } #[derive(Debug)] struct Controller { protocol_description: Arc, inner: ControllerInner, ephemeral: ControllerEphemeral, round_histories: Vec, } #[derive(Debug)] struct ControllerInner { round_index: usize, channel_id_stream: ChannelIdStream, endpoint_exts: Arena, messenger_state: MessengerState, mono_n: Option, mono_ps: Vec, family: ControllerFamily, logger: String, } /// This structure has its state entirely reset between synchronous rounds #[derive(Debug, Default)] struct ControllerEphemeral { solution_storage: SolutionStorage, poly_n: Option, poly_ps: Vec, ekey_to_holder: HashMap, } #[derive(Debug)] struct ControllerFamily { parent_ekey: Option, children_ekeys: Vec, } #[derive(Debug)] pub(crate) enum SyncRunResult { BlockingForRecv, AllBranchesComplete, NoBranches, } // Used to identify poly actors #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] enum PolyId { N, P { index: usize }, } #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub(crate) enum SubtreeId { PolyN, PolyP { index: usize }, ChildController { ekey: Key }, } pub(crate) struct MonoPContext<'a> { inner: &'a mut ControllerInner, ekeys: &'a mut HashSet, } pub(crate) struct PolyPContext<'a> { my_subtree_id: SubtreeId, inner: &'a mut ControllerInner, solution_storage: &'a mut SolutionStorage, } impl PolyPContext<'_> { #[inline(always)] fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { let Self { solution_storage, my_subtree_id, inner } = self; PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } } } struct BranchPContext<'m, 'r> { m_ctx: PolyPContext<'m>, ekeys: &'r HashSet, predicate: &'r Predicate, inbox: &'r HashMap, } #[derive(Default)] pub(crate) struct SolutionStorage { old_local: HashSet, new_local: HashSet, // this pair acts as SubtreeId -> HashSet which is friendlier to iteration subtree_solutions: Vec>, subtree_id_to_index: HashMap, } trait Messengerlike { fn get_state_mut(&mut self) -> &mut MessengerState; fn get_endpoint_mut(&mut self, eekey: Key) -> &mut Endpoint; fn delay(&mut self, received: ReceivedMsg) { self.get_state_mut().delayed.push(received); } fn undelay_all(&mut self) { let MessengerState { delayed, undelayed, .. } = self.get_state_mut(); undelayed.extend(delayed.drain(..)) } fn send(&mut self, to: Key, msg: Msg) -> Result<(), EndpointErr> { self.get_endpoint_mut(to).send(msg) } // attempt to receive a message from one of the endpoints before the deadline fn recv(&mut self, deadline: Instant) -> Result, MessengerRecvErr> { // try get something buffered if let Some(x) = self.get_state_mut().undelayed.pop() { return Ok(Some(x)); } loop { // polled_undrained may not be empty while let Some(eekey) = self.get_state_mut().polled_undrained.pop() { if let Some(msg) = self .get_endpoint_mut(eekey) .recv() .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))? { // this endpoint MAY still have messages! check again in future self.get_state_mut().polled_undrained.insert(eekey); return Ok(Some(ReceivedMsg { recipient: eekey, msg })); } } let state = self.get_state_mut(); match state.poll_events(deadline) { Ok(()) => { for e in state.events.iter() { state.polled_undrained.insert(Key::from_token(e.token())); } } Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), Err(PollDeadlineErr::Timeout) => return Ok(None), } } } // attempt to receive a message from one of the endpoints before the deadline fn recv_until( &mut self, deadline: Option, ) -> Result, MessengerRecvErr> { // try get something buffered if let Some(x) = self.get_state_mut().undelayed.pop() { return Ok(Some(x)); } loop { // polled_undrained may not be empty while let Some(eekey) = self.get_state_mut().polled_undrained.pop() { if let Some(msg) = self .get_endpoint_mut(eekey) .recv() .map_err(|e| MessengerRecvErr::EndpointErr(eekey, e))? { // this endpoint MAY still have messages! check again in future self.get_state_mut().polled_undrained.insert(eekey); return Ok(Some(ReceivedMsg { recipient: eekey, msg })); } } let state = self.get_state_mut(); match state.poll_events_until(deadline) { Ok(()) => { for e in state.events.iter() { state.polled_undrained.insert(Key::from_token(e.token())); } } Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), Err(PollDeadlineErr::Timeout) => return Ok(None), } } } } ///////////////////////////////// impl Debug for SolutionStorage { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.pad("Solutions: [")?; for (subtree_id, &index) in self.subtree_id_to_index.iter() { let sols = &self.subtree_solutions[index]; f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; } f.pad("]") } } impl From for SyncErr { fn from(e: EvalErr) -> SyncErr { SyncErr::EvalErr(e) } } impl From for SyncErr { fn from(e: MessengerRecvErr) -> SyncErr { SyncErr::MessengerRecvErr(e) } } impl From for ConnectErr { fn from(e: MessengerRecvErr) -> ConnectErr { ConnectErr::MessengerRecvErr(e) } } // impl From for MessengerRecvErr { // fn from(e: EndpointErr) -> MessengerRecvErr { // MessengerRecvErr::EndpointErr(e) // } // } impl Default for Arena { fn default() -> Self { Self { storage: vec![] } } } impl Arena { pub fn alloc(&mut self, t: T) -> Key { self.storage.push(t); Key::from_raw(self.storage.len() - 1) } pub fn get(&self, key: Key) -> Option<&T> { self.storage.get(key.to_raw() as usize) } pub fn get_mut(&mut self, key: Key) -> Option<&mut T> { self.storage.get_mut(key.to_raw() as usize) } pub fn type_convert(self, f: impl FnMut((Key, T)) -> X) -> Arena { Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() } } pub fn iter(&self) -> impl Iterator { self.keyspace().zip(self.storage.iter()) } pub fn len(&self) -> usize { self.storage.len() } pub fn keyspace(&self) -> impl Iterator { (0..self.storage.len()).map(Key::from_raw) } } impl ChannelIdStream { fn new(controller_id: ControllerId) -> Self { Self { controller_id, next_channel_index: 0 } } fn next(&mut self) -> ChannelId { self.next_channel_index += 1; ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 } } } impl MessengerState { fn with_event_capacity(event_capacity: usize) -> Result { Ok(Self { poll: Poll::new()?, events: Events::with_capacity(event_capacity), delayed: Default::default(), undelayed: Default::default(), polled_undrained: Default::default(), }) } // does NOT guarantee that events is non-empty fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { use PollDeadlineErr::*; self.events.clear(); let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; Ok(()) } fn poll_events_until(&mut self, deadline: Option) -> Result<(), PollDeadlineErr> { use PollDeadlineErr::*; self.events.clear(); let poll_timeout = if let Some(d) = deadline { Some(d.checked_duration_since(Instant::now()).ok_or(Timeout)?) } else { None }; self.poll.poll(&mut self.events, poll_timeout).map_err(|_| PollingFailed)?; Ok(()) } } impl From for ConnectErr { fn from(e: PollDeadlineErr) -> ConnectErr { match e { PollDeadlineErr::Timeout => ConnectErr::Timeout, PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed, } } } impl std::ops::Not for Polarity { type Output = Self; fn not(self) -> Self::Output { use Polarity::*; match self { Putter => Getter, Getter => Putter, } } } impl Predicate { // returns true IFF self.unify would return Equivalent OR FormerNotLatter pub fn satisfies(&self, other: &Self) -> bool { let mut s_it = self.assigned.iter(); let mut s = if let Some(s) = s_it.next() { s } else { return other.assigned.is_empty(); }; for (oid, ob) in other.assigned.iter() { while s.0 < oid { s = if let Some(s) = s_it.next() { s } else { return false; }; } if s.0 > oid || s.1 != ob { return false; } } true } /// Given self and other, two predicates, return the most general Predicate possible, N /// such that n.satisfies(self) && n.satisfies(other). /// If none exists Nonexistant is returned. /// If the resulting predicate is equivlanet to self, other, or both, /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. /// otherwise New(N) is returned. pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { use CommonSatResult::*; // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; let [mut s, mut o] = [s_it.next(), o_it.next()]; // lists of assignments in self but not other and vice versa. let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; loop { match [s, o] { [None, None] => break, [None, Some(x)] => { o_not_s.push(x); o_not_s.extend(o_it); break; } [Some(x), None] => { s_not_o.push(x); s_not_o.extend(s_it); break; } [Some((sid, sb)), Some((oid, ob))] => { if sid < oid { // o is missing this element s_not_o.push((sid, sb)); s = s_it.next(); } else if sid > oid { // s is missing this element o_not_s.push((oid, ob)); o = o_it.next(); } else if sb != ob { assert_eq!(sid, oid); // both predicates assign the variable but differ on the value return Nonexistant; } else { // both predicates assign the variable to the same value s = s_it.next(); o = o_it.next(); } } } } // Observed zero inconsistencies. A unified predicate exists... match [s_not_o.is_empty(), o_not_s.is_empty()] { [true, true] => Equivalent, // ... equivalent to both. [false, true] => FormerNotLatter, // ... equivalent to self. [true, false] => LatterNotFormer, // ... equivalent to other. [false, false] => { // ... which is the union of the predicates' assignments but // is equivalent to neither self nor other. let mut new = self.clone(); for (&id, &b) in o_not_s { new.assigned.insert(id, b); } New(new) } } } pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { self.assigned .iter() .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) } pub fn batch_assign_nones( &mut self, channel_ids: impl Iterator, value: bool, ) { for channel_id in channel_ids { self.assigned.entry(channel_id).or_insert(value); } } pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option { self.assigned.insert(channel_id, value) } pub fn union_with(&self, other: &Self) -> Option { let mut res = self.clone(); for (&channel_id, &assignment_1) in other.assigned.iter() { match res.assigned.insert(channel_id, assignment_1) { Some(assignment_2) if assignment_1 != assignment_2 => return None, _ => {} } } Some(res) } pub fn query(&self, x: ChannelId) -> Option { self.assigned.get(&x).copied() } pub fn new_trivial() -> Self { Self { assigned: Default::default() } } } impl Debug for Predicate { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.pad("{")?; for (ChannelId { controller_id, channel_index }, &v) in self.assigned.iter() { f.write_fmt(format_args!( "({:?},{:?})=>{}, ", controller_id, channel_index, if v { 'T' } else { 'F' } ))? } f.pad("}") } } #[test] fn pred_sat() { use maplit::btreemap; let mut c = ChannelIdStream::new(0); let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); let p = Predicate::new_trivial(); let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; assert!(p.satisfies(&p)); assert!(p_0t.satisfies(&p_0t)); assert!(p_0f.satisfies(&p_0f)); assert!(p_0f_3f.satisfies(&p_0f_3f)); assert!(p_0f_3t.satisfies(&p_0f_3t)); assert!(p_0t.satisfies(&p)); assert!(p_0f.satisfies(&p)); assert!(p_0f_3f.satisfies(&p_0f)); assert!(p_0f_3t.satisfies(&p_0f)); assert!(!p.satisfies(&p_0t)); assert!(!p.satisfies(&p_0f)); assert!(!p_0f.satisfies(&p_0t)); assert!(!p_0t.satisfies(&p_0f)); assert!(!p_0f_3f.satisfies(&p_0f_3t)); assert!(!p_0f_3t.satisfies(&p_0f_3f)); assert!(!p_0t.satisfies(&p_0f_3f)); assert!(!p_0f.satisfies(&p_0f_3f)); assert!(!p_0t.satisfies(&p_0f_3t)); assert!(!p_0f.satisfies(&p_0f_3t)); } #[test] fn pred_common_sat() { use maplit::btreemap; use CommonSatResult::*; let mut c = ChannelIdStream::new(0); let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); let p = Predicate::new_trivial(); let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; let p_3f = Predicate { assigned: btreemap! { ch[3] => false } }; let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; assert_eq![p.common_satisfier(&p), Equivalent]; assert_eq![p_0t.common_satisfier(&p_0t), Equivalent]; assert_eq![p.common_satisfier(&p_0t), LatterNotFormer]; assert_eq![p_0t.common_satisfier(&p), FormerNotLatter]; assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant]; assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant]; assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant]; assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant]; assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)]; }