mod communication; mod error; mod setup2; #[cfg(test)] mod my_tests; use crate::common::*; use error::*; #[derive(Clone, Copy, Debug)] pub enum LocalComponentId { Native, Proto { index: usize }, } #[derive(Debug, Clone, Copy)] pub enum Route { LocalComponent(LocalComponentId), Endpoint { index: usize }, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct MyPortInfo { polarity: Polarity, port: PortId, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum Decision { Failure, Success(Predicate), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum Msg { SetupMsg(SetupMsg), CommMsg(CommMsg), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum SetupMsg { MyPortInfo(MyPortInfo), LeaderEcho { maybe_leader: ControllerId }, LeaderAnnounce { leader: ControllerId }, YouAreMyParent, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct CommMsg { pub round_index: usize, pub contents: CommMsgContents, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum CommMsgContents { SendPayload { payload_predicate: Predicate, payload: Payload }, Elaborate { partial_oracle: Predicate }, // SINKWARD Failure, // SINKWARD Announce { decision: Decision }, // SINKAWAYS } #[derive(Debug, PartialEq)] pub enum CommonSatResult { FormerNotLatter, LatterNotFormer, Equivalent, New(Predicate), Nonexistant, } pub struct Endpoint { inbox: Vec, stream: TcpStream, } #[derive(Debug, Default)] pub struct IntStream { next: u32, } #[derive(Debug)] pub struct IdManager { controller_id: ControllerId, port_suffix_stream: IntStream, } #[derive(Debug)] pub struct ProtoComponent { state: ComponentState, ports: HashSet, } pub trait Logger: Debug { fn line_writer(&mut self) -> &mut dyn std::fmt::Write; fn dump_log(&self, w: &mut dyn std::io::Write); } #[derive(Debug, Clone)] pub struct EndpointSetup { pub sock_addr: SocketAddr, pub is_active: bool, } #[derive(Debug)] pub struct EndpointExt { endpoint: Endpoint, inp_for_emerging_msgs: PortId, } #[derive(Debug)] pub struct Neighborhood { parent: Option, children: Vec, // ordered, deduplicated } #[derive(Debug)] pub struct MemInMsg { inp: PortId, msg: Payload, } #[derive(Debug)] pub struct EndpointManager { // invariants: // 1. endpoint N is registered READ | WRITE with poller // 2. Events is empty poll: Poll, events: Events, polled_undrained: IndexSet, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, endpoint_exts: Vec, } #[derive(Debug, Default)] pub struct PortInfo { polarities: HashMap, peers: HashMap, routes: HashMap, } #[derive(Debug)] pub struct Connector { logger: Box, proto_description: Arc, id_manager: IdManager, native_ports: HashSet, proto_components: Vec, port_info: PortInfo, phased: ConnectorPhased, } #[derive(Debug)] pub enum ConnectorPhased { Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16, }, Communication { endpoint_manager: EndpointManager, neighborhood: Neighborhood, mem_inbox: Vec, native_actor: NativeActor, // sync invariant: in Nonsync state }, } #[derive(Debug)] pub struct StringLogger(ControllerId, String); #[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { pub assigned: BTreeMap, } pub struct MonitoredReader { bytes: usize, r: R, } pub struct SyncProtoContext<'a> { connector: &'a mut Connector, proto_component_index: usize, } pub struct NonsyncProtoContext<'a> { connector: &'a mut Connector, proto_component_index: usize, } // pub struct MonoPContext<'a> { // inner: &'a mut ControllerInner, // ports: &'a mut HashSet, // mono_ps: &'a mut Vec, // } // pub struct PolyPContext<'a> { // my_subtree_id: SubtreeId, // inner: &'a mut Connector, // solution_storage: &'a mut SolutionStorage, // } // impl PolyPContext<'_> { // #[inline(always)] // fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { // let Self { solution_storage, my_subtree_id, inner } = self; // PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } // } // } // struct BranchPContext<'m, 'r> { // m_ctx: PolyPContext<'m>, // ports: &'r HashSet, // predicate: &'r Predicate, // inbox: &'r HashMap, // } #[derive(Default)] pub struct SolutionStorage { old_local: HashSet, new_local: HashSet, // this pair acts as SubtreeId -> HashSet which is friendlier to iteration subtree_solutions: Vec>, subtree_id_to_index: HashMap, } #[derive(Debug)] pub enum SyncRunResult { BlockingForRecv, AllBranchesComplete, NoBranches, } #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum PolyId { N, P { index: usize }, } #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum SubtreeId { PolyN, PolyP { index: usize }, ChildController { port: PortId }, } #[derive(Debug, Default)] pub struct SyncBatch { to_put: HashMap, to_get: HashSet, } #[derive(Debug)] pub enum NativeActor { Nonsync { sync_result_branch: Option, // invariant: sync_result_branch.to_get.is_empty() next_batches: Vec, // invariant: nonempty }, Sync { branches: HashMap, }, } #[derive(Debug)] pub struct NativeBranch { batch_index: usize, gotten: HashMap, to_get: HashSet, } //////////////// impl EndpointManager { fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { self.endpoint_exts[index].endpoint.send(msg) } fn try_recv_any( &mut self, logger: &mut dyn Logger, deadline: Instant, ) -> Result<(usize, Msg), TryRecyAnyError> { use TryRecyAnyError::*; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { return Ok(x); } loop { // 2. try read a message from an enpoint that previously raised an event while let Some(index) = self.polled_undrained.pop() { let endpoint = &mut self.endpoint_exts[index].endpoint; if let Some(msg) = endpoint.try_recv().map_err(|error| EndpointError { error, index })? { if !endpoint.inbox.is_empty() { // there may be another message waiting! self.polled_undrained.insert(index); } return Ok((index, msg)); } } // 3. No message yet. poll! let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; for event in self.events.iter() { log!(logger, "Poll event {:?}", event); let Token(index) = event.token(); self.polled_undrained.insert(index); } } } fn undelay_all(&mut self) { if self.undelayed_messages.is_empty() { // fast path std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages); return; } // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } } impl Debug for Endpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() } } impl From for MonitoredReader { fn from(r: R) -> Self { Self { r, bytes: 0 } } } impl MonitoredReader { pub fn bytes_read(&self) -> usize { self.bytes } } impl Read for MonitoredReader { fn read(&mut self, buf: &mut [u8]) -> Result { let n = self.r.read(buf)?; self.bytes += n; Ok(n) } } impl Into for SetupMsg { fn into(self) -> Msg { Msg::SetupMsg(self) } } impl StringLogger { pub fn new(controller_id: ControllerId) -> Self { Self(controller_id, String::default()) } } impl Logger for StringLogger { fn line_writer(&mut self) -> &mut dyn std::fmt::Write { use std::fmt::Write; let _ = write!(&mut self.1, "\nCID({}): ", self.0); self } fn dump_log(&self, w: &mut dyn std::io::Write) { let _ = w.write(self.1.as_bytes()); } } impl std::fmt::Write for StringLogger { fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> { self.1.write_str(s) } } impl IntStream { fn next(&mut self) -> u32 { if self.next == u32::MAX { panic!("NO NEXT!") } self.next += 1; self.next - 1 } } impl IdManager { fn next_port(&mut self) -> PortId { let port_suffix = self.port_suffix_stream.next(); let controller_id = self.controller_id; PortId { controller_id, port_index: port_suffix } } fn new(controller_id: ControllerId) -> Self { Self { controller_id, port_suffix_stream: Default::default() } } } impl Endpoint { fn try_recv(&mut self) -> Result, EndpointError> { use EndpointError::*; // populate inbox as much as possible 'read_loop: loop { match self.stream.read_to_end(&mut self.inbox) { Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), Err(_e) => return Err(BrokenEndpoint), } } let mut monitored = MonitoredReader::from(&self.inbox[..]); match bincode::deserialize_from(&mut monitored) { Ok(msg) => { let msg_size = monitored.bytes_read(); self.inbox.drain(0..(msg_size.try_into().unwrap())); Ok(Some(msg)) } Err(e) => match *e { bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { Ok(None) } _ => Err(MalformedMessage), // println!("SERDE ERRKIND {:?}", e); // Err(MalformedMessage) }, } } fn send(&mut self, msg: &T) -> Result<(), ()> { bincode::serialize_into(&mut self.stream, msg).map_err(drop) } } impl Connector { pub fn get_logger(&self) -> &dyn Logger { &*self.logger } pub fn print_state(&self) { let stdout = std::io::stdout(); let mut lock = stdout.lock(); writeln!( lock, "--- Connector with ControllerId={:?}.\n::LOG_DUMP:\n", self.id_manager.controller_id ) .unwrap(); self.get_logger().dump_log(&mut lock); writeln!(lock, "DEBUG_PRINT:\n{:#?}\n", self).unwrap(); } } impl Debug for SolutionStorage { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.pad("Solutions: [")?; for (subtree_id, &index) in self.subtree_id_to_index.iter() { let sols = &self.subtree_solutions[index]; f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; } f.pad("]") } } impl Predicate { // returns true IFF self.unify would return Equivalent OR FormerNotLatter pub fn satisfies(&self, other: &Self) -> bool { let mut s_it = self.assigned.iter(); let mut s = if let Some(s) = s_it.next() { s } else { return other.assigned.is_empty(); }; for (oid, ob) in other.assigned.iter() { while s.0 < oid { s = if let Some(s) = s_it.next() { s } else { return false; }; } if s.0 > oid || s.1 != ob { return false; } } true } /// Given self and other, two predicates, return the most general Predicate possible, N /// such that n.satisfies(self) && n.satisfies(other). /// If none exists Nonexistant is returned. /// If the resulting predicate is equivlanet to self, other, or both, /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. /// otherwise New(N) is returned. pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { use CommonSatResult::*; // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; let [mut s, mut o] = [s_it.next(), o_it.next()]; // lists of assignments in self but not other and vice versa. let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; loop { match [s, o] { [None, None] => break, [None, Some(x)] => { o_not_s.push(x); o_not_s.extend(o_it); break; } [Some(x), None] => { s_not_o.push(x); s_not_o.extend(s_it); break; } [Some((sid, sb)), Some((oid, ob))] => { if sid < oid { // o is missing this element s_not_o.push((sid, sb)); s = s_it.next(); } else if sid > oid { // s is missing this element o_not_s.push((oid, ob)); o = o_it.next(); } else if sb != ob { assert_eq!(sid, oid); // both predicates assign the variable but differ on the value return Nonexistant; } else { // both predicates assign the variable to the same value s = s_it.next(); o = o_it.next(); } } } } // Observed zero inconsistencies. A unified predicate exists... match [s_not_o.is_empty(), o_not_s.is_empty()] { [true, true] => Equivalent, // ... equivalent to both. [false, true] => FormerNotLatter, // ... equivalent to self. [true, false] => LatterNotFormer, // ... equivalent to other. [false, false] => { // ... which is the union of the predicates' assignments but // is equivalent to neither self nor other. let mut new = self.clone(); for (&id, &b) in o_not_s { new.assigned.insert(id, b); } New(new) } } } pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { self.assigned .iter() .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) } pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator, value: bool) { for channel_id in channel_ids { self.assigned.entry(channel_id).or_insert(value); } } pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option { self.assigned.insert(channel_id, value) } pub fn union_with(&self, other: &Self) -> Option { let mut res = self.clone(); for (&channel_id, &assignment_1) in other.assigned.iter() { match res.assigned.insert(channel_id, assignment_1) { Some(assignment_2) if assignment_1 != assignment_2 => return None, _ => {} } } Some(res) } pub fn query(&self, x: PortId) -> Option { self.assigned.get(&x).copied() } pub fn new_trivial() -> Self { Self { assigned: Default::default() } } }