diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..5dbac51a7f84823a0de49e6b4521231d187e60f6 --- /dev/null +++ b/src/runtime/mod.rs @@ -0,0 +1,507 @@ +#[cfg(feature = "ffi")] +pub mod ffi; + +mod actors; +pub(crate) mod communication; +pub(crate) mod connector; +pub(crate) mod endpoint; +pub mod errors; +mod predicate; // TODO later +mod serde; +pub(crate) mod setup; + +pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl; +pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl; + +use crate::common::*; +use actors::*; +use endpoint::*; +use errors::*; + +#[derive(Debug, PartialEq)] +pub(crate) enum CommonSatResult { + FormerNotLatter, + LatterNotFormer, + Equivalent, + New(Predicate), + Nonexistant, +} + +#[derive(Clone, Eq, PartialEq, Hash)] +pub(crate) struct Predicate { + pub assigned: BTreeMap, +} + +#[derive(Debug, Default)] +struct SyncBatch { + puts: HashMap, + gets: HashSet, +} + +#[derive(Debug)] +pub enum Connector { + Unconfigured(Unconfigured), + Configured(Configured), + Connected(Connected), // TODO consider boxing. currently takes up a lot of stack real estate +} +#[derive(Debug)] +pub struct Unconfigured { + pub controller_id: ControllerId, +} +#[derive(Debug)] +pub struct Connected { + native_interface: Vec<(Key, Polarity)>, + sync_batches: Vec, + controller: Controller, +} +#[derive(Debug)] +pub struct Configured { + // invariant: proto_maybe_bindings.len() is the size of the protocol's interface + controller_id: ControllerId, + proto_maybe_bindings: Vec<(Polarity, Option)>, + protocol_description: Arc, +} + +#[derive(Debug, Copy, Clone)] +pub enum PortBinding { + Native, + Active(SocketAddr), + Passive(SocketAddr), +} + +#[derive(Debug)] +struct Arena { + storage: Vec, +} + +#[derive(Debug)] +struct ReceivedMsg { + recipient: Key, + msg: Msg, +} + +#[derive(Debug)] +struct MessengerState { + poll: Poll, + events: Events, + delayed: Vec, + undelayed: Vec, + polled_undrained: IndexSet, +} +#[derive(Debug)] +struct ChannelIdStream { + controller_id: ControllerId, + next_channel_index: ChannelIndex, +} + +#[derive(Debug)] +struct Controller { + protocol_description: Arc, + inner: ControllerInner, + ephemeral: ControllerEphemeral, +} +#[derive(Debug)] +struct ControllerInner { + round_index: usize, + channel_id_stream: ChannelIdStream, + endpoint_exts: Arena, + messenger_state: MessengerState, + mono_n: Option, + mono_ps: Vec, + family: ControllerFamily, +} + +/// This structure has its state entirely reset between synchronous rounds +#[derive(Debug, Default)] +struct ControllerEphemeral { + solution_storage: SolutionStorage, + poly_n: Option, + poly_ps: Vec, + ekey_to_holder: HashMap, +} + +#[derive(Debug)] +struct ControllerFamily { + parent_ekey: Option, + children_ekeys: Vec, +} + +#[derive(Debug)] +pub(crate) enum SyncRunResult { + BlockingForRecv, + AllBranchesComplete, + NoBranches, +} + +// Used to identify poly actors +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +enum PolyId { + N, + P { index: usize }, +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub(crate) enum SubtreeId { + PolyN, + PolyP { index: usize }, + ChildController { ekey: Key }, +} + +pub(crate) struct MonoPContext<'a> { + inner: &'a mut ControllerInner, + ekeys: &'a mut HashSet, +} +pub(crate) struct PolyPContext<'a> { + my_subtree_id: SubtreeId, + inner: &'a mut ControllerInner, + solution_storage: &'a mut SolutionStorage, +} +impl PolyPContext<'_> { + #[inline(always)] + fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { + let Self { solution_storage, my_subtree_id, inner } = self; + PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } + } +} +struct BranchPContext<'m, 'r> { + m_ctx: PolyPContext<'m>, + ekeys: &'r HashSet, + predicate: &'r Predicate, + inbox: &'r HashMap, +} + +#[derive(Debug, Default)] +pub(crate) struct SolutionStorage { + old_local: HashSet, + new_local: HashSet, + // this pair acts as SubtreeId -> HashSet which is friendlier to iteration + subtree_solutions: Vec>, + subtree_id_to_index: HashMap, +} + +trait Messengerlike { + fn get_state_mut(&mut self) -> &mut MessengerState; + fn get_endpoint_mut(&mut self, eekey: Key) -> &mut Endpoint; + + fn delay(&mut self, received: ReceivedMsg) { + self.get_state_mut().delayed.push(received); + } + fn undelay_all(&mut self) { + let MessengerState { delayed, undelayed, .. } = self.get_state_mut(); + undelayed.extend(delayed.drain(..)) + } + + fn send(&mut self, to: Key, msg: Msg) -> Result<(), EndpointErr> { + self.get_endpoint_mut(to).send(msg) + } + + // attempt to receive a message from one of the endpoints before the deadline + fn recv(&mut self, deadline: Instant) -> Result, MessengerRecvErr> { + // try get something buffered + if let Some(x) = self.get_state_mut().undelayed.pop() { + return Ok(Some(x)); + } + + loop { + // polled_undrained may not be empty + while let Some(eekey) = self.get_state_mut().polled_undrained.pop() { + if let Some(msg) = self.get_endpoint_mut(eekey).recv()? { + // this endpoint MAY still have messages! check again in future + self.get_state_mut().polled_undrained.insert(eekey); + return Ok(Some(ReceivedMsg { recipient: eekey, msg })); + } + } + + let state = self.get_state_mut(); + match state.poll_events(deadline) { + Ok(()) => { + for e in state.events.iter() { + state.polled_undrained.insert(Key::from_token(e.token())); + } + } + Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), + Err(PollDeadlineErr::Timeout) => return Ok(None), + } + } + } +} + +///////////////////////////////// + +impl From for SyncErr { + fn from(e: EvalErr) -> SyncErr { + SyncErr::EvalErr(e) + } +} +impl From for SyncErr { + fn from(e: MessengerRecvErr) -> SyncErr { + SyncErr::MessengerRecvErr(e) + } +} +impl From for ConnectErr { + fn from(e: MessengerRecvErr) -> ConnectErr { + ConnectErr::MessengerRecvErr(e) + } +} +impl From for MessengerRecvErr { + fn from(e: EndpointErr) -> MessengerRecvErr { + MessengerRecvErr::EndpointErr(e) + } +} +impl Default for Arena { + fn default() -> Self { + Self { storage: vec![] } + } +} +impl Arena { + pub fn alloc(&mut self, t: T) -> Key { + self.storage.push(t); + Key::from_raw(self.storage.len() as u64 - 1) + } + pub fn get(&self, key: Key) -> Option<&T> { + self.storage.get(key.to_raw() as usize) + } + pub fn get_mut(&mut self, key: Key) -> Option<&mut T> { + self.storage.get_mut(key.to_raw() as usize) + } + pub fn type_convert(self, f: impl FnMut((Key, T)) -> X) -> Arena { + Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() } + } + pub fn iter(&self) -> impl Iterator { + self.keyspace().zip(self.storage.iter()) + } + pub fn len(&self) -> usize { + self.storage.len() + } + pub fn keyspace(&self) -> impl Iterator { + (0..(self.storage.len() as u64)).map(Key::from_raw) + } +} + +impl ChannelIdStream { + fn new(controller_id: ControllerId) -> Self { + Self { controller_id, next_channel_index: 0 } + } + fn next(&mut self) -> ChannelId { + self.next_channel_index += 1; + ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 } + } +} + +impl MessengerState { + // does NOT guarantee that events is non-empty + fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { + use PollDeadlineErr::*; + self.events.clear(); + let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; + self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; + Ok(()) + } +} +impl From for ConnectErr { + fn from(e: PollDeadlineErr) -> ConnectErr { + match e { + PollDeadlineErr::Timeout => ConnectErr::Timeout, + PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed, + } + } +} + +impl std::ops::Not for Polarity { + type Output = Self; + fn not(self) -> Self::Output { + use Polarity::*; + match self { + Putter => Getter, + Getter => Putter, + } + } +} + +impl Predicate { + // returns true IFF self.unify would return Equivalent OR FormerNotLatter + pub fn satisfies(&self, other: &Self) -> bool { + let mut s_it = self.assigned.iter(); + let mut s = if let Some(s) = s_it.next() { + s + } else { + return other.assigned.is_empty(); + }; + for (oid, ob) in other.assigned.iter() { + while s.0 < oid { + s = if let Some(s) = s_it.next() { + s + } else { + return false; + }; + } + if s.0 > oid || s.1 != ob { + return false; + } + } + true + } + + /// Given self and other, two predicates, return the most general Predicate possible, N + /// such that n.satisfies(self) && n.satisfies(other). + /// If none exists Nonexistant is returned. + /// If the resulting predicate is equivlanet to self, other, or both, + /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. + /// otherwise New(N) is returned. + pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { + use CommonSatResult::*; + // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. + let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; + let [mut s, mut o] = [s_it.next(), o_it.next()]; + // lists of assignments in self but not other and vice versa. + let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; + loop { + match [s, o] { + [None, None] => break, + [None, Some(x)] => { + o_not_s.push(x); + o_not_s.extend(o_it); + break; + } + [Some(x), None] => { + s_not_o.push(x); + s_not_o.extend(s_it); + break; + } + [Some((sid, sb)), Some((oid, ob))] => { + if sid < oid { + // o is missing this element + s_not_o.push((sid, sb)); + s = s_it.next(); + } else if sid > oid { + // s is missing this element + o_not_s.push((sid, sb)); + o = o_it.next(); + } else if sb != ob { + assert_eq!(sid, oid); + // both predicates assign the variable but differ on the value + return Nonexistant; + } else { + // both predicates assign the variable to the same value + s = s_it.next(); + o = o_it.next(); + } + } + } + } + // Observed zero inconsistencies. A unified predicate exists... + match [s_not_o.is_empty(), o_not_s.is_empty()] { + [true, true] => Equivalent, // ... equivalent to both. + [false, true] => FormerNotLatter, // ... equivalent to self. + [true, false] => LatterNotFormer, // ... equivalent to other. + [false, false] => { + // ... which is the union of the predicates' assignments but + // is equivalent to neither self nor other. + let mut predicate = self.clone(); + for (&id, &b) in o_not_s { + predicate.assigned.insert(id, b); + } + New(predicate) + } + } + } + + pub fn batch_assign_nones( + &mut self, + channel_ids: impl Iterator, + value: bool, + ) { + for channel_id in channel_ids { + self.assigned.entry(channel_id).or_insert(value); + } + } + pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option { + self.assigned.insert(channel_id, value) + } + pub fn union_with(&self, other: &Self) -> Option { + let mut res = self.clone(); + for (&channel_id, &assignment_1) in other.assigned.iter() { + match res.assigned.insert(channel_id, assignment_1) { + Some(assignment_2) if assignment_1 != assignment_2 => return None, + _ => {} + } + } + Some(res) + } + pub fn query(&self, x: ChannelId) -> Option { + self.assigned.get(&x).copied() + } + pub fn new_trivial() -> Self { + Self { assigned: Default::default() } + } +} +impl Debug for Predicate { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + for (ChannelId { controller_id, channel_index }, &v) in self.assigned.iter() { + write!(f, "{:?}=>{}", (controller_id, channel_index), if v { 'T' } else { 'F' })?; + } + Ok(()) + } +} + +#[test] +fn pred_sat() { + use maplit::btreemap; + let mut c = ChannelIdStream::new(0); + let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); + let p = Predicate::new_trivial(); + let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; + let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; + let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; + let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; + + assert!(p.satisfies(&p)); + assert!(p_0t.satisfies(&p_0t)); + assert!(p_0f.satisfies(&p_0f)); + assert!(p_0f_3f.satisfies(&p_0f_3f)); + assert!(p_0f_3t.satisfies(&p_0f_3t)); + + assert!(p_0t.satisfies(&p)); + assert!(p_0f.satisfies(&p)); + assert!(p_0f_3f.satisfies(&p_0f)); + assert!(p_0f_3t.satisfies(&p_0f)); + + assert!(!p.satisfies(&p_0t)); + assert!(!p.satisfies(&p_0f)); + assert!(!p_0f.satisfies(&p_0t)); + assert!(!p_0t.satisfies(&p_0f)); + assert!(!p_0f_3f.satisfies(&p_0f_3t)); + assert!(!p_0f_3t.satisfies(&p_0f_3f)); + assert!(!p_0t.satisfies(&p_0f_3f)); + assert!(!p_0f.satisfies(&p_0f_3f)); + assert!(!p_0t.satisfies(&p_0f_3t)); + assert!(!p_0f.satisfies(&p_0f_3t)); +} + +#[test] +fn pred_common_sat() { + use maplit::btreemap; + use CommonSatResult::*; + + let mut c = ChannelIdStream::new(0); + let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); + let p = Predicate::new_trivial(); + let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; + let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; + let p_3f = Predicate { assigned: btreemap! { ch[3] => false } }; + let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; + let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; + + assert_eq![p.common_satisfier(&p), Equivalent]; + assert_eq![p_0t.common_satisfier(&p_0t), Equivalent]; + + assert_eq![p.common_satisfier(&p_0t), LatterNotFormer]; + assert_eq![p_0t.common_satisfier(&p), FormerNotLatter]; + + assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant]; + assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant]; + assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant]; + assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant]; + + assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)]; +}