diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index e33c900e34e8c07738ec7b82234d58841469ba2f..3999f7a07d4287caa3eaccb111afff0a6a0286e3 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -1,23 +1,57 @@ -#[cfg(feature = "ffi")] -pub mod ffi; - -mod actors; -pub(crate) mod communication; -pub(crate) mod connector; -pub(crate) mod endpoint; -pub mod errors; -mod serde; -pub(crate) mod setup; +// #[cfg(feature = "ffi")] +// pub mod ffi; + +// mod actors; +// pub(crate) mod communication; +// pub(crate) mod connector; +// pub(crate) mod endpoint; +// pub mod errors; +// mod serde; +mod my_tests; +mod setup2; +// pub(crate) mod setup; // mod v2; -pub(crate) type ProtocolD = crate::protocol::ProtocolDescriptionImpl; -pub(crate) type ProtocolS = crate::protocol::ComponentStateImpl; - use crate::common::*; -use actors::*; -use endpoint::*; -use errors::*; - +// use actors::*; +// use endpoint::*; +// use errors::*; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub(crate) enum Decision { + Failure, + Success(Predicate), +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub(crate) enum Msg { + SetupMsg(SetupMsg), + CommMsg(CommMsg), +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct MyPortInfo { + polarity: Polarity, + port: PortId, +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub(crate) enum SetupMsg { + // sent by the passive endpoint to the active endpoint + // MyPortInfo(MyPortInfo), + LeaderEcho { maybe_leader: ControllerId }, + LeaderAnnounce { leader: ControllerId }, + YouAreMyParent, +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub(crate) struct CommMsg { + pub round_index: usize, + pub contents: CommMsgContents, +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub(crate) enum CommMsgContents { + SendPayload { payload_predicate: Predicate, payload: Payload }, + Elaborate { partial_oracle: Predicate }, // SINKWARD + Failure, // SINKWARD + Announce { decision: Decision }, // SINKAWAYS +} #[derive(Debug, PartialEq)] pub(crate) enum CommonSatResult { FormerNotLatter, @@ -26,543 +60,753 @@ pub(crate) enum CommonSatResult { New(Predicate), Nonexistant, } - -#[derive(Clone, Eq, PartialEq, Hash)] -pub(crate) struct Predicate { - pub assigned: BTreeMap, +pub struct Endpoint { + inbox: Vec, + stream: mio07::net::TcpStream, } - #[derive(Debug, Default)] -struct SyncBatch { - puts: HashMap, - gets: HashSet, +pub struct IntStream { + next: u32, } - #[derive(Debug)] -pub enum Connector { - Unconfigured(Unconfigured), - Configured(Configured), - Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space +pub struct IdManager { + controller_id: ControllerId, + port_suffix_stream: IntStream, } #[derive(Debug)] -pub struct Unconfigured { - pub controller_id: ControllerId, +pub struct ProtoComponent { + state: ComponentState, + ports: HashSet, } #[derive(Debug)] -pub struct Configured { - controller_id: ControllerId, - polarities: Vec, - bindings: HashMap, - protocol_description: Arc, - main_component: Vec, - logger: String, +pub enum InpRoute { + NativeComponent, + ProtoComponent { index: usize }, + Endpoint { index: usize }, } -#[derive(Debug)] -pub struct Connected { - native_interface: Vec<(Port, Polarity)>, - sync_batches: Vec, - // controller is cooperatively scheduled with the native application - // (except for transport layer behind Endpoints, which are managed by the OS) - // control flow is passed to the controller during methods on Connector (primarily, connect and sync). - controller: Controller, +pub trait Logger: Debug { + fn line_writer(&mut self) -> &mut dyn std::fmt::Write; + fn dump_log(&self, w: &mut dyn std::io::Write); } - -#[derive(Debug, Copy, Clone)] -pub enum PortBinding { - Native, - Active(SocketAddr), - Passive(SocketAddr), +#[derive(Debug, Clone)] +pub struct EndpointSetup { + pub polarity: Polarity, + pub sock_addr: SocketAddr, + pub is_active: bool, } - #[derive(Debug)] -struct Arena { - storage: Vec, +pub struct EndpointExt { + endpoint: Endpoint, + inp_for_emerging_msgs: PortId, } - #[derive(Debug)] -struct ReceivedMsg { - recipient: Port, - msg: Msg, +pub struct Neighborhood { + parent: Option, + children: Vec, // ordered, deduplicated } - #[derive(Debug)] -struct MessengerState { - poll: Poll, - events: Events, - delayed: Vec, - undelayed: Vec, - polled_undrained: IndexSet, +pub struct MemInMsg { + inp: PortId, + msg: Payload, } #[derive(Debug)] -struct ChannelIdStream { - controller_id: ControllerId, - next_channel_index: ChannelIndex, +pub struct EndpointPoller { + poll: mio07::Poll, + events: mio07::Events, + undrained_endpoints: HashSet, + delayed_inp_messages: Vec<(PortId, Msg)>, } - #[derive(Debug)] -struct Controller { - protocol_description: Arc, - inner: ControllerInner, - ephemeral: ControllerEphemeral, - unrecoverable_error: Option, // prevents future calls to Sync +pub struct Connector { + logger: Box, + proto_description: Arc, + id_manager: IdManager, + native_ports: HashSet, + proto_components: Vec, + outp_to_inp: HashMap, + inp_to_route: HashMap, + phased: ConnectorPhased, } #[derive(Debug)] -struct ControllerInner { - round_index: usize, - channel_id_stream: ChannelIdStream, - endpoint_exts: Arena, - messenger_state: MessengerState, - mono_n: MonoN, // state at next round start - mono_ps: Vec, // state at next round start - family: ControllerFamily, - logger: String, -} - -/// This structure has its state entirely reset between synchronous rounds -#[derive(Debug, Default)] -struct ControllerEphemeral { - solution_storage: SolutionStorage, - poly_n: Option, - poly_ps: Vec, - mono_ps: Vec, - port_to_holder: HashMap, +pub enum ConnectorPhased { + Setup { + endpoint_setups: Vec<(PortId, EndpointSetup)>, + surplus_sockets: u16, + }, + Communication { + endpoint_poller: EndpointPoller, + endpoint_exts: Vec, + neighborhood: Neighborhood, + mem_inbox: Vec, + }, } - #[derive(Debug)] -struct ControllerFamily { - parent_port: Option, - children_ports: Vec, +pub struct StringLogger(ControllerId, String); +#[derive(Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +pub(crate) struct Predicate { + pub assigned: BTreeMap, } - -#[derive(Debug)] -pub(crate) enum SyncRunResult { - BlockingForRecv, - AllBranchesComplete, - NoBranches, +#[derive(Debug, Default)] +struct SyncBatch { + puts: HashMap, + gets: HashSet, } - -// Used to identify poly actors -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -enum PolyId { - N, - P { index: usize }, +pub struct MonitoredReader { + bytes: usize, + r: R, } - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub(crate) enum SubtreeId { - PolyN, - PolyP { index: usize }, - ChildController { port: Port }, +pub enum EndpointRecvErr { + MalformedMessage, + BrokenEndpoint, } - -pub(crate) struct MonoPContext<'a> { - inner: &'a mut ControllerInner, - ports: &'a mut HashSet, - mono_ps: &'a mut Vec, +pub struct SyncContext<'a> { + connector: &'a mut Connector, } -pub(crate) struct PolyPContext<'a> { - my_subtree_id: SubtreeId, - inner: &'a mut ControllerInner, - solution_storage: &'a mut SolutionStorage, +pub struct NonsyncContext<'a> { + connector: &'a mut Connector, } -impl PolyPContext<'_> { - #[inline(always)] - fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { - let Self { solution_storage, my_subtree_id, inner } = self; - PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } +//////////////// +impl Debug for Endpoint { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() } } -struct BranchPContext<'m, 'r> { - m_ctx: PolyPContext<'m>, - ports: &'r HashSet, - predicate: &'r Predicate, - inbox: &'r HashMap, -} - -#[derive(Default)] -pub(crate) struct SolutionStorage { - old_local: HashSet, - new_local: HashSet, - // this pair acts as SubtreeId -> HashSet which is friendlier to iteration - subtree_solutions: Vec>, - subtree_id_to_index: HashMap, -} - -trait Messengerlike { - fn get_state_mut(&mut self) -> &mut MessengerState; - fn get_endpoint_mut(&mut self, eport: Port) -> &mut Endpoint; - - fn delay(&mut self, received: ReceivedMsg) { - self.get_state_mut().delayed.push(received); +impl NonsyncContext<'_> { + pub fn new_component(&mut self, moved_ports: HashSet, init_state: ComponentState) { + todo!() } - fn undelay_all(&mut self) { - let MessengerState { delayed, undelayed, .. } = self.get_state_mut(); - undelayed.extend(delayed.drain(..)) + pub fn new_channel(&mut self) -> [PortId; 2] { + todo!() } - - fn send(&mut self, to: Port, msg: Msg) -> Result<(), EndpointErr> { - self.get_endpoint_mut(to).send(msg) - } - - // attempt to receive a message from one of the endpoints before the deadline - fn recv(&mut self, deadline: Instant) -> Result, MessengerRecvErr> { - // try get something buffered - if let Some(x) = self.get_state_mut().undelayed.pop() { - return Ok(Some(x)); - } - - loop { - // polled_undrained may not be empty - while let Some(eport) = self.get_state_mut().polled_undrained.pop() { - if let Some(msg) = self - .get_endpoint_mut(eport) - .recv() - .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? - { - // this endpoint MAY still have messages! check again in future - self.get_state_mut().polled_undrained.insert(eport); - return Ok(Some(ReceivedMsg { recipient: eport, msg })); - } - } - - let state = self.get_state_mut(); - match state.poll_events(deadline) { - Ok(()) => { - for e in state.events.iter() { - state.polled_undrained.insert(Port::from_token(e.token())); - } - } - Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), - Err(PollDeadlineErr::Timeout) => return Ok(None), - } - } +} +impl SyncContext<'_> { + pub fn is_firing(&mut self, port: PortId) -> Option { + todo!() } - fn recv_blocking(&mut self) -> Result { - // try get something buffered - if let Some(x) = self.get_state_mut().undelayed.pop() { - return Ok(x); - } - - loop { - // polled_undrained may not be empty - while let Some(eport) = self.get_state_mut().polled_undrained.pop() { - if let Some(msg) = self - .get_endpoint_mut(eport) - .recv() - .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? - { - // this endpoint MAY still have messages! check again in future - self.get_state_mut().polled_undrained.insert(eport); - return Ok(ReceivedMsg { recipient: eport, msg }); - } - } - - let state = self.get_state_mut(); - - state - .poll - .poll(&mut state.events, None) - .map_err(|_| MessengerRecvErr::PollingFailed)?; - for e in state.events.iter() { - state.polled_undrained.insert(Port::from_token(e.token())); - } - } + pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + todo!() } } - -///////////////////////////////// -impl Debug for SolutionStorage { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.pad("Solutions: [")?; - for (subtree_id, &index) in self.subtree_id_to_index.iter() { - let sols = &self.subtree_solutions[index]; - f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; - } - f.pad("]") +impl From for MonitoredReader { + fn from(r: R) -> Self { + Self { r, bytes: 0 } } } -impl From for SyncErr { - fn from(e: EvalErr) -> SyncErr { - SyncErr::EvalErr(e) +impl MonitoredReader { + pub fn bytes_read(&self) -> usize { + self.bytes } } -impl From for SyncErr { - fn from(e: MessengerRecvErr) -> SyncErr { - SyncErr::MessengerRecvErr(e) +impl Read for MonitoredReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + let n = self.r.read(buf)?; + self.bytes += n; + Ok(n) } } -impl From for ConnectErr { - fn from(e: MessengerRecvErr) -> ConnectErr { - ConnectErr::MessengerRecvErr(e) +impl Into for SetupMsg { + fn into(self) -> Msg { + Msg::SetupMsg(self) } } -impl Default for Arena { - fn default() -> Self { - Self { storage: vec![] } +impl Debug for Predicate { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.pad("{")?; + for (port, &v) in self.assigned.iter() { + f.write_fmt(format_args!("{:?}=>{}, ", port, if v { 'T' } else { 'F' }))? + } + f.pad("}") } } -impl Arena { - pub fn alloc(&mut self, t: T) -> Port { - self.storage.push(t); - let l: u32 = self.storage.len().try_into().unwrap(); - Port::from_raw(l - 1u32) - } - pub fn get(&self, key: Port) -> Option<&T> { - self.storage.get(key.to_raw() as usize) - } - pub fn get_mut(&mut self, key: Port) -> Option<&mut T> { - self.storage.get_mut(key.to_raw() as usize) - } - pub fn type_convert(self, f: impl FnMut((Port, T)) -> X) -> Arena { - Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() } - } - pub fn iter(&self) -> impl Iterator { - self.keyspace().zip(self.storage.iter()) - } - pub fn len(&self) -> usize { - self.storage.len() - } - pub fn keyspace(&self) -> impl Iterator { - (0u32..self.storage.len().try_into().unwrap()).map(Port::from_raw) +impl StringLogger { + pub fn new(controller_id: ControllerId) -> Self { + Self(controller_id, String::default()) } } - -impl ChannelIdStream { - fn new(controller_id: ControllerId) -> Self { - Self { controller_id, next_channel_index: 0 } +impl Logger for StringLogger { + fn line_writer(&mut self) -> &mut dyn std::fmt::Write { + use std::fmt::Write; + let _ = write!(&mut self.1, "\nCID({}): ", self.0); + self } - fn next(&mut self) -> ChannelId { - self.next_channel_index += 1; - ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 } + fn dump_log(&self, w: &mut dyn std::io::Write) { + let _ = w.write(self.1.as_bytes()); } } - -impl MessengerState { - // does NOT guarantee that events is non-empty - fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { - use PollDeadlineErr::*; - self.events.clear(); - let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; - self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; - Ok(()) +impl std::fmt::Write for StringLogger { + fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> { + self.1.write_str(s) } } -impl From for ConnectErr { - fn from(e: PollDeadlineErr) -> ConnectErr { - match e { - PollDeadlineErr::Timeout => ConnectErr::Timeout, - PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed, +impl IntStream { + fn next(&mut self) -> u32 { + if self.next == u32::MAX { + panic!("NO NEXT!") } + self.next += 1; + self.next - 1 } } - -impl std::ops::Not for Polarity { - type Output = Self; - fn not(self) -> Self::Output { - use Polarity::*; - match self { - Putter => Getter, - Getter => Putter, - } +impl IdManager { + fn next_port(&mut self) -> PortId { + let port_suffix = self.port_suffix_stream.next(); + let controller_id = self.controller_id; + PortId { controller_id, port_index: port_suffix } } -} - -impl Predicate { - // returns true IFF self.unify would return Equivalent OR FormerNotLatter - pub fn satisfies(&self, other: &Self) -> bool { - let mut s_it = self.assigned.iter(); - let mut s = if let Some(s) = s_it.next() { - s - } else { - return other.assigned.is_empty(); - }; - for (oid, ob) in other.assigned.iter() { - while s.0 < oid { - s = if let Some(s) = s_it.next() { - s - } else { - return false; - }; - } - if s.0 > oid || s.1 != ob { - return false; + fn new(controller_id: ControllerId) -> Self { + Self { controller_id, port_suffix_stream: Default::default() } + } +} +impl Endpoint { + fn try_recv(&mut self) -> Result, EndpointRecvErr> { + use EndpointRecvErr::*; + // populate inbox as much as possible + 'read_loop: loop { + match self.stream.read_to_end(&mut self.inbox) { + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, + Ok(0) => break 'read_loop, + Ok(_) => (), + Err(_e) => return Err(BrokenEndpoint), } } - true - } - - /// Given self and other, two predicates, return the most general Predicate possible, N - /// such that n.satisfies(self) && n.satisfies(other). - /// If none exists Nonexistant is returned. - /// If the resulting predicate is equivlanet to self, other, or both, - /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. - /// otherwise New(N) is returned. - pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { - use CommonSatResult::*; - // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. - let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; - let [mut s, mut o] = [s_it.next(), o_it.next()]; - // lists of assignments in self but not other and vice versa. - let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; - loop { - match [s, o] { - [None, None] => break, - [None, Some(x)] => { - o_not_s.push(x); - o_not_s.extend(o_it); - break; - } - [Some(x), None] => { - s_not_o.push(x); - s_not_o.extend(s_it); - break; - } - [Some((sid, sb)), Some((oid, ob))] => { - if sid < oid { - // o is missing this element - s_not_o.push((sid, sb)); - s = s_it.next(); - } else if sid > oid { - // s is missing this element - o_not_s.push((oid, ob)); - o = o_it.next(); - } else if sb != ob { - assert_eq!(sid, oid); - // both predicates assign the variable but differ on the value - return Nonexistant; - } else { - // both predicates assign the variable to the same value - s = s_it.next(); - o = o_it.next(); - } - } + let mut monitored = MonitoredReader::from(&self.inbox[..]); + match bincode::deserialize_from(&mut monitored) { + Ok(msg) => { + let msg_size = monitored.bytes_read(); + self.inbox.drain(0..(msg_size.try_into().unwrap())); + Ok(Some(msg)) } - } - // Observed zero inconsistencies. A unified predicate exists... - match [s_not_o.is_empty(), o_not_s.is_empty()] { - [true, true] => Equivalent, // ... equivalent to both. - [false, true] => FormerNotLatter, // ... equivalent to self. - [true, false] => LatterNotFormer, // ... equivalent to other. - [false, false] => { - // ... which is the union of the predicates' assignments but - // is equivalent to neither self nor other. - let mut new = self.clone(); - for (&id, &b) in o_not_s { - new.assigned.insert(id, b); + Err(e) => match *e { + bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { + Ok(None) } - New(new) - } - } - } - - pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { - self.assigned - .iter() - .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) - } - - pub fn batch_assign_nones( - &mut self, - channel_ids: impl Iterator, - value: bool, - ) { - for channel_id in channel_ids { - self.assigned.entry(channel_id).or_insert(value); - } - } - pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option { - self.assigned.insert(channel_id, value) - } - pub fn union_with(&self, other: &Self) -> Option { - let mut res = self.clone(); - for (&channel_id, &assignment_1) in other.assigned.iter() { - match res.assigned.insert(channel_id, assignment_1) { - Some(assignment_2) if assignment_1 != assignment_2 => return None, - _ => {} - } + _ => Err(MalformedMessage), + // println!("SERDE ERRKIND {:?}", e); + // Err(MalformedMessage) + }, } - Some(res) - } - pub fn query(&self, x: ChannelId) -> Option { - self.assigned.get(&x).copied() } - pub fn new_trivial() -> Self { - Self { assigned: Default::default() } - } -} -impl Debug for Predicate { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - f.pad("{")?; - for (ChannelId { controller_id, channel_index }, &v) in self.assigned.iter() { - f.write_fmt(format_args!( - "({:?},{:?})=>{}, ", - controller_id, - channel_index, - if v { 'T' } else { 'F' } - ))? - } - f.pad("}") - } -} - -#[test] -fn pred_sat() { - use maplit::btreemap; - let mut c = ChannelIdStream::new(0); - let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); - let p = Predicate::new_trivial(); - let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; - let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; - let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; - let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; - - assert!(p.satisfies(&p)); - assert!(p_0t.satisfies(&p_0t)); - assert!(p_0f.satisfies(&p_0f)); - assert!(p_0f_3f.satisfies(&p_0f_3f)); - assert!(p_0f_3t.satisfies(&p_0f_3t)); - - assert!(p_0t.satisfies(&p)); - assert!(p_0f.satisfies(&p)); - assert!(p_0f_3f.satisfies(&p_0f)); - assert!(p_0f_3t.satisfies(&p_0f)); - - assert!(!p.satisfies(&p_0t)); - assert!(!p.satisfies(&p_0f)); - assert!(!p_0f.satisfies(&p_0t)); - assert!(!p_0t.satisfies(&p_0f)); - assert!(!p_0f_3f.satisfies(&p_0f_3t)); - assert!(!p_0f_3t.satisfies(&p_0f_3f)); - assert!(!p_0t.satisfies(&p_0f_3f)); - assert!(!p_0f.satisfies(&p_0f_3f)); - assert!(!p_0t.satisfies(&p_0f_3t)); - assert!(!p_0f.satisfies(&p_0f_3t)); -} - -#[test] -fn pred_common_sat() { - use maplit::btreemap; - use CommonSatResult::*; - - let mut c = ChannelIdStream::new(0); - let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); - let p = Predicate::new_trivial(); - let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; - let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; - let p_3f = Predicate { assigned: btreemap! { ch[3] => false } }; - let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; - let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; - - assert_eq![p.common_satisfier(&p), Equivalent]; - assert_eq![p_0t.common_satisfier(&p_0t), Equivalent]; - - assert_eq![p.common_satisfier(&p_0t), LatterNotFormer]; - assert_eq![p_0t.common_satisfier(&p), FormerNotLatter]; - - assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant]; - assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant]; - assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant]; - assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant]; - - assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)]; -} + fn send(&mut self, msg: &T) -> Result<(), ()> { + bincode::serialize_into(&mut self.stream, msg).map_err(drop) + } +} +impl Connector { + fn get_logger(&self) -> &dyn Logger { + &*self.logger + } +} + +// #[derive(Debug)] +// pub enum Connector { +// Unconfigured(Unconfigured), +// Configured(Configured), +// Connected(Connected), // TODO consider boxing. currently takes up a lot of stack space +// } +// #[derive(Debug)] +// pub struct Unconfigured { +// pub controller_id: ControllerId, +// } +// #[derive(Debug)] +// pub struct Configured { +// controller_id: ControllerId, +// polarities: Vec, +// bindings: HashMap, +// protocol_description: Arc, +// main_component: Vec, +// logger: String, +// } +// #[derive(Debug)] +// pub struct Connected { +// native_interface: Vec<(PortId, Polarity)>, +// sync_batches: Vec, +// // controller is cooperatively scheduled with the native application +// // (except for transport layer behind Endpoints, which are managed by the OS) +// // control flow is passed to the controller during methods on Connector (primarily, connect and sync). +// controller: Controller, +// } + +// #[derive(Debug, Copy, Clone)] +// pub enum PortBinding { +// Native, +// Active(SocketAddr), +// Passive(SocketAddr), +// } + +// #[derive(Debug)] +// struct Arena { +// storage: Vec, +// } + +// #[derive(Debug)] +// struct ReceivedMsg { +// recipient: PortId, +// msg: Msg, +// } + +// #[derive(Debug)] +// struct MessengerState { +// poll: Poll, +// events: Events, +// delayed: Vec, +// undelayed: Vec, +// polled_undrained: IndexSet, +// } +// #[derive(Debug)] +// struct ChannelIdStream { +// controller_id: ControllerId, +// next_channel_index: ChannelIndex, +// } + +// #[derive(Debug)] +// struct Controller { +// protocol_description: Arc, +// inner: ControllerInner, +// ephemeral: ControllerEphemeral, +// unrecoverable_error: Option, // prevents future calls to Sync +// } +// #[derive(Debug)] +// struct ControllerInner { +// round_index: usize, +// channel_id_stream: ChannelIdStream, +// endpoint_exts: Arena, +// messenger_state: MessengerState, +// mono_n: MonoN, // state at next round start +// mono_ps: Vec, // state at next round start +// family: ControllerFamily, +// logger: String, +// } + +// /// This structure has its state entirely reset between synchronous rounds +// #[derive(Debug, Default)] +// struct ControllerEphemeral { +// solution_storage: SolutionStorage, +// poly_n: Option, +// poly_ps: Vec, +// mono_ps: Vec, +// port_to_holder: HashMap, +// } + +// #[derive(Debug)] +// struct ControllerFamily { +// parent_port: Option, +// children_ports: Vec, +// } + +// #[derive(Debug)] +// pub(crate) enum SyncRunResult { +// BlockingForRecv, +// AllBranchesComplete, +// NoBranches, +// } + +// // Used to identify poly actors +// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +// enum PolyId { +// N, +// P { index: usize }, +// } + +// #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +// pub(crate) enum SubtreeId { +// PolyN, +// PolyP { index: usize }, +// ChildController { port: PortId }, +// } + +// pub(crate) struct MonoPContext<'a> { +// inner: &'a mut ControllerInner, +// ports: &'a mut HashSet, +// mono_ps: &'a mut Vec, +// } +// pub(crate) struct PolyPContext<'a> { +// my_subtree_id: SubtreeId, +// inner: &'a mut ControllerInner, +// solution_storage: &'a mut SolutionStorage, +// } +// impl PolyPContext<'_> { +// #[inline(always)] +// fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { +// let Self { solution_storage, my_subtree_id, inner } = self; +// PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } +// } +// } +// struct BranchPContext<'m, 'r> { +// m_ctx: PolyPContext<'m>, +// ports: &'r HashSet, +// predicate: &'r Predicate, +// inbox: &'r HashMap, +// } + +// #[derive(Default)] +// pub(crate) struct SolutionStorage { +// old_local: HashSet, +// new_local: HashSet, +// // this pair acts as SubtreeId -> HashSet which is friendlier to iteration +// subtree_solutions: Vec>, +// subtree_id_to_index: HashMap, +// } + +// trait Messengerlike { +// fn get_state_mut(&mut self) -> &mut MessengerState; +// fn get_endpoint_mut(&mut self, eport: PortId) -> &mut Endpoint; + +// fn delay(&mut self, received: ReceivedMsg) { +// self.get_state_mut().delayed.push(received); +// } +// fn undelay_all(&mut self) { +// let MessengerState { delayed, undelayed, .. } = self.get_state_mut(); +// undelayed.extend(delayed.drain(..)) +// } + +// fn send(&mut self, to: PortId, msg: Msg) -> Result<(), EndpointErr> { +// self.get_endpoint_mut(to).send(msg) +// } + +// // attempt to receive a message from one of the endpoints before the deadline +// fn recv(&mut self, deadline: Instant) -> Result, MessengerRecvErr> { +// // try get something buffered +// if let Some(x) = self.get_state_mut().undelayed.pop() { +// return Ok(Some(x)); +// } + +// loop { +// // polled_undrained may not be empty +// while let Some(eport) = self.get_state_mut().polled_undrained.pop() { +// if let Some(msg) = self +// .get_endpoint_mut(eport) +// .recv() +// .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? +// { +// // this endpoint MAY still have messages! check again in future +// self.get_state_mut().polled_undrained.insert(eport); +// return Ok(Some(ReceivedMsg { recipient: eport, msg })); +// } +// } + +// let state = self.get_state_mut(); +// match state.poll_events(deadline) { +// Ok(()) => { +// for e in state.events.iter() { +// state.polled_undrained.insert(PortId::from_token(e.token())); +// } +// } +// Err(PollDeadlineErr::PollingFailed) => return Err(MessengerRecvErr::PollingFailed), +// Err(PollDeadlineErr::Timeout) => return Ok(None), +// } +// } +// } +// fn recv_blocking(&mut self) -> Result { +// // try get something buffered +// if let Some(x) = self.get_state_mut().undelayed.pop() { +// return Ok(x); +// } + +// loop { +// // polled_undrained may not be empty +// while let Some(eport) = self.get_state_mut().polled_undrained.pop() { +// if let Some(msg) = self +// .get_endpoint_mut(eport) +// .recv() +// .map_err(|e| MessengerRecvErr::EndpointErr(eport, e))? +// { +// // this endpoint MAY still have messages! check again in future +// self.get_state_mut().polled_undrained.insert(eport); +// return Ok(ReceivedMsg { recipient: eport, msg }); +// } +// } + +// let state = self.get_state_mut(); + +// state +// .poll +// .poll(&mut state.events, None) +// .map_err(|_| MessengerRecvErr::PollingFailed)?; +// for e in state.events.iter() { +// state.polled_undrained.insert(PortId::from_token(e.token())); +// } +// } +// } +// } + +// ///////////////////////////////// +// impl Debug for SolutionStorage { +// fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +// f.pad("Solutions: [")?; +// for (subtree_id, &index) in self.subtree_id_to_index.iter() { +// let sols = &self.subtree_solutions[index]; +// f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; +// } +// f.pad("]") +// } +// } +// impl From for SyncErr { +// fn from(e: EvalErr) -> SyncErr { +// SyncErr::EvalErr(e) +// } +// } +// impl From for SyncErr { +// fn from(e: MessengerRecvErr) -> SyncErr { +// SyncErr::MessengerRecvErr(e) +// } +// } +// impl From for ConnectErr { +// fn from(e: MessengerRecvErr) -> ConnectErr { +// ConnectErr::MessengerRecvErr(e) +// } +// } +// impl Default for Arena { +// fn default() -> Self { +// Self { storage: vec![] } +// } +// } +// impl Arena { +// pub fn alloc(&mut self, t: T) -> PortId { +// self.storage.push(t); +// let l: u32 = self.storage.len().try_into().unwrap(); +// PortId::from_raw(l - 1u32) +// } +// pub fn get(&self, key: PortId) -> Option<&T> { +// self.storage.get(key.to_raw() as usize) +// } +// pub fn get_mut(&mut self, key: PortId) -> Option<&mut T> { +// self.storage.get_mut(key.to_raw() as usize) +// } +// pub fn type_convert(self, f: impl FnMut((PortId, T)) -> X) -> Arena { +// Arena { storage: self.keyspace().zip(self.storage.into_iter()).map(f).collect() } +// } +// pub fn iter(&self) -> impl Iterator { +// self.keyspace().zip(self.storage.iter()) +// } +// pub fn len(&self) -> usize { +// self.storage.len() +// } +// pub fn keyspace(&self) -> impl Iterator { +// (0u32..self.storage.len().try_into().unwrap()).map(PortId::from_raw) +// } +// } + +// impl ChannelIdStream { +// fn new(controller_id: ControllerId) -> Self { +// Self { controller_id, next_channel_index: 0 } +// } +// fn next(&mut self) -> ChannelId { +// self.next_channel_index += 1; +// ChannelId { controller_id: self.controller_id, channel_index: self.next_channel_index - 1 } +// } +// } + +// impl MessengerState { +// // does NOT guarantee that events is non-empty +// fn poll_events(&mut self, deadline: Instant) -> Result<(), PollDeadlineErr> { +// use PollDeadlineErr::*; +// self.events.clear(); +// let poll_timeout = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; +// self.poll.poll(&mut self.events, Some(poll_timeout)).map_err(|_| PollingFailed)?; +// Ok(()) +// } +// } +// impl From for ConnectErr { +// fn from(e: PollDeadlineErr) -> ConnectErr { +// match e { +// PollDeadlineErr::Timeout => ConnectErr::Timeout, +// PollDeadlineErr::PollingFailed => ConnectErr::PollingFailed, +// } +// } +// } + +// impl std::ops::Not for Polarity { +// type Output = Self; +// fn not(self) -> Self::Output { +// use Polarity::*; +// match self { +// Putter => Getter, +// Getter => Putter, +// } +// } +// } + +// impl Predicate { +// // returns true IFF self.unify would return Equivalent OR FormerNotLatter +// pub fn satisfies(&self, other: &Self) -> bool { +// let mut s_it = self.assigned.iter(); +// let mut s = if let Some(s) = s_it.next() { +// s +// } else { +// return other.assigned.is_empty(); +// }; +// for (oid, ob) in other.assigned.iter() { +// while s.0 < oid { +// s = if let Some(s) = s_it.next() { +// s +// } else { +// return false; +// }; +// } +// if s.0 > oid || s.1 != ob { +// return false; +// } +// } +// true +// } + +// /// Given self and other, two predicates, return the most general Predicate possible, N +// /// such that n.satisfies(self) && n.satisfies(other). +// /// If none exists Nonexistant is returned. +// /// If the resulting predicate is equivlanet to self, other, or both, +// /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. +// /// otherwise New(N) is returned. +// pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { +// use CommonSatResult::*; +// // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. +// let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; +// let [mut s, mut o] = [s_it.next(), o_it.next()]; +// // lists of assignments in self but not other and vice versa. +// let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; +// loop { +// match [s, o] { +// [None, None] => break, +// [None, Some(x)] => { +// o_not_s.push(x); +// o_not_s.extend(o_it); +// break; +// } +// [Some(x), None] => { +// s_not_o.push(x); +// s_not_o.extend(s_it); +// break; +// } +// [Some((sid, sb)), Some((oid, ob))] => { +// if sid < oid { +// // o is missing this element +// s_not_o.push((sid, sb)); +// s = s_it.next(); +// } else if sid > oid { +// // s is missing this element +// o_not_s.push((oid, ob)); +// o = o_it.next(); +// } else if sb != ob { +// assert_eq!(sid, oid); +// // both predicates assign the variable but differ on the value +// return Nonexistant; +// } else { +// // both predicates assign the variable to the same value +// s = s_it.next(); +// o = o_it.next(); +// } +// } +// } +// } +// // Observed zero inconsistencies. A unified predicate exists... +// match [s_not_o.is_empty(), o_not_s.is_empty()] { +// [true, true] => Equivalent, // ... equivalent to both. +// [false, true] => FormerNotLatter, // ... equivalent to self. +// [true, false] => LatterNotFormer, // ... equivalent to other. +// [false, false] => { +// // ... which is the union of the predicates' assignments but +// // is equivalent to neither self nor other. +// let mut new = self.clone(); +// for (&id, &b) in o_not_s { +// new.assigned.insert(id, b); +// } +// New(new) +// } +// } +// } + +// pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { +// self.assigned +// .iter() +// .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) +// } + +// pub fn batch_assign_nones( +// &mut self, +// channel_ids: impl Iterator, +// value: bool, +// ) { +// for channel_id in channel_ids { +// self.assigned.entry(channel_id).or_insert(value); +// } +// } +// pub fn replace_assignment(&mut self, channel_id: ChannelId, value: bool) -> Option { +// self.assigned.insert(channel_id, value) +// } +// pub fn union_with(&self, other: &Self) -> Option { +// let mut res = self.clone(); +// for (&channel_id, &assignment_1) in other.assigned.iter() { +// match res.assigned.insert(channel_id, assignment_1) { +// Some(assignment_2) if assignment_1 != assignment_2 => return None, +// _ => {} +// } +// } +// Some(res) +// } +// pub fn query(&self, x: ChannelId) -> Option { +// self.assigned.get(&x).copied() +// } +// pub fn new_trivial() -> Self { +// Self { assigned: Default::default() } +// } +// } + +// #[test] +// fn pred_sat() { +// use maplit::btreemap; +// let mut c = ChannelIdStream::new(0); +// let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); +// let p = Predicate::new_trivial(); +// let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; +// let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; +// let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; +// let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; + +// assert!(p.satisfies(&p)); +// assert!(p_0t.satisfies(&p_0t)); +// assert!(p_0f.satisfies(&p_0f)); +// assert!(p_0f_3f.satisfies(&p_0f_3f)); +// assert!(p_0f_3t.satisfies(&p_0f_3t)); + +// assert!(p_0t.satisfies(&p)); +// assert!(p_0f.satisfies(&p)); +// assert!(p_0f_3f.satisfies(&p_0f)); +// assert!(p_0f_3t.satisfies(&p_0f)); + +// assert!(!p.satisfies(&p_0t)); +// assert!(!p.satisfies(&p_0f)); +// assert!(!p_0f.satisfies(&p_0t)); +// assert!(!p_0t.satisfies(&p_0f)); +// assert!(!p_0f_3f.satisfies(&p_0f_3t)); +// assert!(!p_0f_3t.satisfies(&p_0f_3f)); +// assert!(!p_0t.satisfies(&p_0f_3f)); +// assert!(!p_0f.satisfies(&p_0f_3f)); +// assert!(!p_0t.satisfies(&p_0f_3t)); +// assert!(!p_0f.satisfies(&p_0f_3t)); +// } + +// #[test] +// fn pred_common_sat() { +// use maplit::btreemap; +// use CommonSatResult::*; + +// let mut c = ChannelIdStream::new(0); +// let ch = std::iter::repeat_with(move || c.next()).take(5).collect::>(); +// let p = Predicate::new_trivial(); +// let p_0t = Predicate { assigned: btreemap! { ch[0] => true } }; +// let p_0f = Predicate { assigned: btreemap! { ch[0] => false } }; +// let p_3f = Predicate { assigned: btreemap! { ch[3] => false } }; +// let p_0f_3f = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => false } }; +// let p_0f_3t = Predicate { assigned: btreemap! { ch[0] => false, ch[3] => true } }; + +// assert_eq![p.common_satisfier(&p), Equivalent]; +// assert_eq![p_0t.common_satisfier(&p_0t), Equivalent]; + +// assert_eq![p.common_satisfier(&p_0t), LatterNotFormer]; +// assert_eq![p_0t.common_satisfier(&p), FormerNotLatter]; + +// assert_eq![p_0t.common_satisfier(&p_0f), Nonexistant]; +// assert_eq![p_0f_3t.common_satisfier(&p_0f_3f), Nonexistant]; +// assert_eq![p_0f_3t.common_satisfier(&p_3f), Nonexistant]; +// assert_eq![p_3f.common_satisfier(&p_0f_3t), Nonexistant]; + +// assert_eq![p_0f.common_satisfier(&p_3f), New(p_0f_3f)]; +// }