mod communication; pub mod error; mod setup2; #[cfg(test)] mod tests; use crate::common::*; use error::*; #[derive( Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] pub struct FiringVar(PortId); #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub enum LocalComponentId { Native, Proto(ProtoComponentId), } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub enum Route { LocalComponent(LocalComponentId), Endpoint { index: usize }, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct MyPortInfo { polarity: Polarity, port: PortId, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum Decision { Failure, Success(Predicate), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum Msg { SetupMsg(SetupMsg), CommMsg(CommMsg), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum SetupMsg { MyPortInfo(MyPortInfo), LeaderEcho { maybe_leader: ControllerId }, LeaderAnnounce { leader: ControllerId }, YouAreMyParent, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct CommMsg { pub round_index: usize, pub contents: CommMsgContents, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum CommMsgContents { SendPayload(SendPayloadMsg), Suggest { suggestion: Decision }, // SINKWARD Announce { decision: Decision }, // SINKAWAYS } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct SendPayloadMsg { predicate: Predicate, payload: Payload, } #[derive(Debug, PartialEq)] pub enum CommonSatResult { FormerNotLatter, LatterNotFormer, Equivalent, New(Predicate), Nonexistant, } pub struct Endpoint { inbox: Vec, stream: TcpStream, } #[derive(Debug, Clone)] pub struct ProtoComponent { state: ComponentState, ports: HashSet, } pub trait Logger: Debug { fn line_writer(&mut self) -> &mut dyn std::fmt::Write; fn dump_log(&self, w: &mut dyn std::io::Write); } #[derive(Debug, Clone)] pub struct EndpointSetup { pub sock_addr: SocketAddr, pub is_active: bool, } #[derive(Debug)] pub struct EndpointExt { endpoint: Endpoint, getter_for_incoming: PortId, } #[derive(Debug)] pub struct Neighborhood { parent: Option, children: Vec, // ordered, deduplicated } #[derive(Debug)] pub struct MemInMsg { inp: PortId, msg: Payload, } #[derive(Debug)] pub struct IdManager { controller_id: ControllerId, port_suffix_stream: U32Stream, proto_component_suffix_stream: U32Stream, } #[derive(Debug)] pub struct EndpointManager { // invariants: // 1. endpoint N is registered READ | WRITE with poller // 2. Events is empty poll: Poll, events: Events, polled_undrained: IndexSet, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, endpoint_exts: Vec, } #[derive(Debug, Default)] pub struct PortInfo { polarities: HashMap, peers: HashMap, routes: HashMap, } #[derive(Debug)] pub struct Connector { proto_description: Arc, proto_components: HashMap, logger: Box, id_manager: IdManager, native_ports: HashSet, port_info: PortInfo, phased: ConnectorPhased, } #[derive(Debug)] pub enum ConnectorPhased { Setup { endpoint_setups: Vec<(PortId, EndpointSetup)>, surplus_sockets: u16, }, Communication { round_index: usize, endpoint_manager: EndpointManager, neighborhood: Neighborhood, mem_inbox: Vec, native_batches: Vec, round_result: Result)>, SyncError>, }, } #[derive(Debug)] pub struct StringLogger(ControllerId, String); #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { pub assigned: BTreeMap, } #[derive(Debug, Default)] pub struct NativeBatch { // invariant: putters' and getters' polarities respected to_put: HashMap, to_get: HashSet, } pub struct MonitoredReader { bytes: usize, r: R, } pub struct NonsyncProtoContext<'a> { logger: &'a mut dyn Logger, proto_component_id: ProtoComponentId, port_info: &'a mut PortInfo, id_manager: &'a mut IdManager, proto_component_ports: &'a mut HashSet, unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, } pub struct SyncProtoContext<'a> { logger: &'a mut dyn Logger, predicate: &'a Predicate, proto_component_id: ProtoComponentId, port_info: &'a PortInfo, inbox: &'a HashMap, } // pub struct MonoPContext<'a> { // inner: &'a mut ControllerInner, // ports: &'a mut HashSet, // mono_ps: &'a mut Vec, // } // pub struct PolyPContext<'a> { // my_subtree_id: SubtreeId, // inner: &'a mut Connector, // solution_storage: &'a mut SolutionStorage, // } // impl PolyPContext<'_> { // #[inline(always)] // fn reborrow<'a>(&'a mut self) -> PolyPContext<'a> { // let Self { solution_storage, my_subtree_id, inner } = self; // PolyPContext { solution_storage, my_subtree_id: *my_subtree_id, inner } // } // } // struct BranchPContext<'m, 'r> { // m_ctx: PolyPContext<'m>, // ports: &'r HashSet, // predicate: &'r Predicate, // inbox: &'r HashMap, // } // #[derive(Debug)] // pub enum SyncRunResult { // BlockingForRecv, // AllBranchesComplete, // NoBranches, // } // #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] // pub enum PolyId { // N, // P { index: usize }, // } // #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] // pub enum SubtreeId { // PolyN, // PolyP { index: usize }, // ChildController { port: PortId }, // } // #[derive(Debug)] // pub struct NativeBranch { // gotten: HashMap, // to_get: HashSet, // } //////////////// impl PortInfo { fn firing_var_for(&self, port: PortId) -> FiringVar { FiringVar(match self.polarities.get(&port).unwrap() { Getter => port, Putter => *self.peers.get(&port).unwrap(), }) } } impl IdManager { fn new(controller_id: ControllerId) -> Self { Self { controller_id, port_suffix_stream: Default::default(), proto_component_suffix_stream: Default::default(), } } fn new_port_id(&mut self) -> PortId { Id { controller_id: self.controller_id, u32_suffix: self.port_suffix_stream.next() }.into() } fn new_proto_component_id(&mut self) -> ProtoComponentId { Id { controller_id: self.controller_id, u32_suffix: self.proto_component_suffix_stream.next(), } .into() } } impl Connector { pub fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the native let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; self.native_ports.insert(o); self.native_ports.insert(i); // {polarity, peer, route} known. {} unknown. self.port_info.polarities.insert(o, Putter); self.port_info.polarities.insert(i, Getter); self.port_info.peers.insert(o, i); self.port_info.peers.insert(i, o); let route = Route::LocalComponent(LocalComponentId::Native); self.port_info.routes.insert(o, route); self.port_info.routes.insert(i, route); log!(self.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } pub fn add_component( &mut self, identifier: &[u8], ports: &[PortId], ) -> Result<(), AddComponentError> { // called by the USER. moves ports owned by the NATIVE use AddComponentError::*; // 1. check if this is OK let polarities = self.proto_description.component_polarities(identifier)?; if polarities.len() != ports.len() { return Err(WrongNumberOfParamaters { expected: polarities.len() }); } for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { if !self.native_ports.contains(port) { return Err(UnknownPort(*port)); } if expected_polarity != *self.port_info.polarities.get(port).unwrap() { return Err(WrongPortPolarity { port: *port, expected_polarity }); } } // 3. remove ports from old component & update port->route let new_id = self.id_manager.new_proto_component_id(); for port in ports.iter() { self.port_info .routes .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); } self.native_ports.retain(|port| !ports.contains(port)); // 4. add new component self.proto_components.insert( new_id, ProtoComponent { state: self.proto_description.new_main_component(identifier, ports), ports: ports.iter().copied().collect(), }, ); Ok(()) } } impl EndpointManager { fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), ()> { self.endpoint_exts[index].endpoint.send(msg) } fn try_recv_any(&mut self, deadline: Instant) -> Result<(usize, Msg), TryRecyAnyError> { use TryRecyAnyError::*; // 1. try messages already buffered if let Some(x) = self.undelayed_messages.pop() { return Ok(x); } loop { // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained while let Some(index) = self.polled_undrained.pop() { let endpoint = &mut self.endpoint_exts[index].endpoint; if let Some(msg) = endpoint.try_recv().map_err(|error| EndpointError { error, index })? { if !endpoint.inbox.is_empty() { // there may be another message waiting! self.polled_undrained.insert(index); } return Ok((index, msg)); } } // 3. No message yet. Do we have enough time to poll? let remaining = deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?; self.poll.poll(&mut self.events, Some(remaining)).map_err(|_| PollFailed)?; for event in self.events.iter() { let Token(index) = event.token(); self.polled_undrained.insert(index); } self.events.clear(); } } fn undelay_all(&mut self) { if self.undelayed_messages.is_empty() { // fast path std::mem::swap(&mut self.delayed_messages, &mut self.undelayed_messages); return; } // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } } impl Debug for Endpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() } } impl From for MonitoredReader { fn from(r: R) -> Self { Self { r, bytes: 0 } } } impl MonitoredReader { pub fn bytes_read(&self) -> usize { self.bytes } } impl Read for MonitoredReader { fn read(&mut self, buf: &mut [u8]) -> Result { let n = self.r.read(buf)?; self.bytes += n; Ok(n) } } impl Into for SetupMsg { fn into(self) -> Msg { Msg::SetupMsg(self) } } impl StringLogger { pub fn new(controller_id: ControllerId) -> Self { Self(controller_id, String::default()) } } impl Drop for StringLogger { fn drop(&mut self) { let stdout = std::io::stdout(); let mut lock = stdout.lock(); writeln!(lock, "--- DROP LOG DUMP ---").unwrap(); self.dump_log(&mut lock); } } impl Logger for StringLogger { fn line_writer(&mut self) -> &mut dyn std::fmt::Write { use std::fmt::Write; let _ = write!(&mut self.1, "\nCID({}): ", self.0); self } fn dump_log(&self, w: &mut dyn std::io::Write) { let _ = w.write(self.1.as_bytes()); } } impl std::fmt::Write for StringLogger { fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> { self.1.write_str(s) } } impl Endpoint { fn try_recv(&mut self) -> Result, EndpointError> { use EndpointError::*; // populate inbox as much as possible 'read_loop: loop { match self.stream.read_to_end(&mut self.inbox) { Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), Err(_e) => return Err(BrokenEndpoint), } } let mut monitored = MonitoredReader::from(&self.inbox[..]); match bincode::deserialize_from(&mut monitored) { Ok(msg) => { let msg_size = monitored.bytes_read(); self.inbox.drain(0..(msg_size.try_into().unwrap())); Ok(Some(msg)) } Err(e) => match *e { bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { Ok(None) } _ => Err(MalformedMessage), // println!("SERDE ERRKIND {:?}", e); // Err(MalformedMessage) }, } } fn send(&mut self, msg: &T) -> Result<(), ()> { bincode::serialize_into(&mut self.stream, msg).map_err(drop) } } impl Connector { pub fn get_logger(&self) -> &dyn Logger { &*self.logger } pub fn print_state(&self) { let stdout = std::io::stdout(); let mut lock = stdout.lock(); writeln!( lock, "--- Connector with ControllerId={:?}.\n::LOG_DUMP:\n", self.id_manager.controller_id ) .unwrap(); self.get_logger().dump_log(&mut lock); writeln!(lock, "\n\nDEBUG_PRINT:\n{:#?}\n", self).unwrap(); } } // impl Debug for SolutionStorage { // fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { // f.pad("Solutions: [")?; // for (subtree_id, &index) in self.subtree_id_to_index.iter() { // let sols = &self.subtree_solutions[index]; // f.write_fmt(format_args!("{:?}: {:?}, ", subtree_id, sols))?; // } // f.pad("]") // } // } impl Predicate { #[inline] pub fn inserted(mut self, k: FiringVar, v: bool) -> Self { self.assigned.insert(k, v); self } // returns true IFF self.unify would return Equivalent OR FormerNotLatter pub fn satisfies(&self, other: &Self) -> bool { let mut s_it = self.assigned.iter(); let mut s = if let Some(s) = s_it.next() { s } else { return other.assigned.is_empty(); }; for (oid, ob) in other.assigned.iter() { while s.0 < oid { s = if let Some(s) = s_it.next() { s } else { return false; }; } if s.0 > oid || s.1 != ob { return false; } } true } /// Given self and other, two predicates, return the most general Predicate possible, N /// such that n.satisfies(self) && n.satisfies(other). /// If none exists Nonexistant is returned. /// If the resulting predicate is equivlanet to self, other, or both, /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. /// otherwise New(N) is returned. pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { use CommonSatResult as Csr; // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; let [mut s, mut o] = [s_it.next(), o_it.next()]; // lists of assignments in self but not other and vice versa. let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; loop { match [s, o] { [None, None] => break, [None, Some(x)] => { o_not_s.push(x); o_not_s.extend(o_it); break; } [Some(x), None] => { s_not_o.push(x); s_not_o.extend(s_it); break; } [Some((sid, sb)), Some((oid, ob))] => { if sid < oid { // o is missing this element s_not_o.push((sid, sb)); s = s_it.next(); } else if sid > oid { // s is missing this element o_not_s.push((oid, ob)); o = o_it.next(); } else if sb != ob { assert_eq!(sid, oid); // both predicates assign the variable but differ on the value return Csr::Nonexistant; } else { // both predicates assign the variable to the same value s = s_it.next(); o = o_it.next(); } } } } // Observed zero inconsistencies. A unified predicate exists... match [s_not_o.is_empty(), o_not_s.is_empty()] { [true, true] => Csr::Equivalent, // ... equivalent to both. [false, true] => Csr::FormerNotLatter, // ... equivalent to self. [true, false] => Csr::LatterNotFormer, // ... equivalent to other. [false, false] => { // ... which is the union of the predicates' assignments but // is equivalent to neither self nor other. let mut new = self.clone(); for (&id, &b) in o_not_s { new.assigned.insert(id, b); } Csr::New(new) } } } // pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { // self.assigned // .iter() // .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) // } // pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator, value: bool) { // for channel_id in channel_ids { // self.assigned.entry(channel_id).or_insert(value); // } // } // pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option { // self.assigned.insert(channel_id, value) // } pub fn union_with(&self, other: &Self) -> Option { let mut res = self.clone(); for (&channel_id, &assignment_1) in other.assigned.iter() { match res.assigned.insert(channel_id, assignment_1) { Some(assignment_2) if assignment_1 != assignment_2 => return None, _ => {} } } Some(res) } pub fn query(&self, var: FiringVar) -> Option { self.assigned.get(&var).copied() } } impl Debug for Predicate { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { struct MySet<'a>(&'a Predicate, bool); impl Debug for MySet<'_> { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { let iter = self.0.assigned.iter().filter_map(|(port, &firing)| { if firing == self.1 { Some(port) } else { None } }); f.debug_set().entries(iter).finish() } } f.debug_struct("Predicate") .field("Trues", &MySet(self, true)) .field("Falses", &MySet(self, false)) .finish() } }