/// cbindgen:ignore mod communication; /// cbindgen:ignore mod endpoints; pub mod error; /// cbindgen:ignore mod logging; /// cbindgen:ignore mod setup; #[cfg(test)] mod tests; use crate::common::*; use error::*; use mio::net::UdpSocket; #[derive(Debug)] pub struct Connector { unphased: ConnectorUnphased, phased: ConnectorPhased, } pub trait Logger: Debug { fn line_writer(&mut self) -> Option<&mut dyn std::io::Write>; } #[derive(Debug)] pub struct VecLogger(ConnectorId, Vec); #[derive(Debug)] pub struct DummyLogger; #[derive(Debug)] pub struct FileLogger(ConnectorId, std::fs::File); pub(crate) struct NonsyncProtoContext<'a> { logger: &'a mut dyn Logger, proto_component_id: ProtoComponentId, port_info: &'a mut PortInfo, id_manager: &'a mut IdManager, proto_component_ports: &'a mut HashSet, unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, } pub(crate) struct SyncProtoContext<'a> { logger: &'a mut dyn Logger, untaken_choice: &'a mut Option, predicate: &'a Predicate, port_info: &'a PortInfo, inbox: &'a HashMap, } #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] struct SpecVar(PortId); #[derive( Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, )] struct SpecVal(u16); #[derive(Debug)] struct RoundOk { batch_index: usize, gotten: HashMap, } #[derive(Default)] struct VecSet { // invariant: ordered, deduplicated vec: Vec, } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] enum ComponentId { Native, Proto(ProtoComponentId), } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] enum Route { LocalComponent(ComponentId), NetEndpoint { index: usize }, UdpEndpoint { index: usize }, } #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] enum SubtreeId { LocalComponent(ComponentId), NetEndpoint { index: usize }, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct MyPortInfo { polarity: Polarity, port: PortId, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] enum Decision { Failure, Success(Predicate), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum Msg { SetupMsg(SetupMsg), CommMsg(CommMsg), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum SetupMsg { MyPortInfo(MyPortInfo), LeaderWave { wave_leader: ConnectorId }, LeaderAnnounce { tree_leader: ConnectorId }, YouAreMyParent, SessionGather { unoptimized_map: HashMap }, SessionScatter { optimized_map: HashMap }, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct SessionInfo { serde_proto_description: SerdeProtocolDescription, port_info: PortInfo, endpoint_incoming_to_getter: Vec, proto_components: HashMap, } #[derive(Debug, Clone)] struct SerdeProtocolDescription(Arc); #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct CommMsg { round_index: usize, contents: CommMsgContents, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum CommMsgContents { SendPayload(SendPayloadMsg), CommCtrl(CommCtrlMsg), } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum CommCtrlMsg { Suggest { suggestion: Decision }, // SINKWARD Announce { decision: Decision }, // SINKAWAYS } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct SendPayloadMsg { predicate: Predicate, payload: Payload, } #[derive(Debug, PartialEq)] enum AssignmentUnionResult { FormerNotLatter, LatterNotFormer, Equivalent, New(Predicate), Nonexistant, } struct NetEndpoint { inbox: Vec, stream: TcpStream, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] struct ProtoComponent { state: ComponentState, ports: HashSet, } #[derive(Debug, Clone)] struct NetEndpointSetup { getter_for_incoming: PortId, sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, } #[derive(Debug, Clone)] struct UdpEndpointSetup { getter_for_incoming: PortId, local_addr: SocketAddr, peer_addr: SocketAddr, } #[derive(Debug)] struct NetEndpointExt { net_endpoint: NetEndpoint, getter_for_incoming: PortId, } #[derive(Debug)] struct Neighborhood { parent: Option, children: VecSet, } #[derive(Debug)] struct IdManager { connector_id: ConnectorId, port_suffix_stream: U32Stream, proto_component_suffix_stream: U32Stream, } #[derive(Debug)] struct UdpInBuffer { byte_vec: Vec, } #[derive(Debug)] struct SpecVarStream { connector_id: ConnectorId, port_suffix_stream: U32Stream, } #[derive(Debug)] struct EndpointManager { // invariants: // 1. net and udp endpoints are registered with poll. Poll token computed with TargetToken::into // 2. Events is empty poll: Poll, events: Events, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, net_endpoint_store: EndpointStore, udp_endpoint_store: EndpointStore, udp_in_buffer: UdpInBuffer, } #[derive(Debug)] struct EndpointStore { endpoint_exts: Vec, polled_undrained: VecSet, } #[derive(Debug)] struct UdpEndpointExt { sock: UdpSocket, // already bound and connected outgoing_payloads: HashMap, incoming_round_spec_var: Option, getter_for_incoming: PortId, incoming_payloads: Vec, } #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] struct PortInfo { polarities: HashMap, peers: HashMap, routes: HashMap, } #[derive(Debug)] struct ConnectorCommunication { round_index: usize, endpoint_manager: EndpointManager, neighborhood: Neighborhood, native_batches: Vec, round_result: Result, SyncError>, } #[derive(Debug)] struct ConnectorUnphased { proto_description: Arc, proto_components: HashMap, logger: Box, id_manager: IdManager, native_ports: HashSet, port_info: PortInfo, } #[derive(Debug)] struct ConnectorSetup { net_endpoint_setups: Vec, udp_endpoint_setups: Vec, surplus_sockets: u16, } #[derive(Debug)] enum ConnectorPhased { Setup(Box), Communication(Box), } #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] struct Predicate { assigned: BTreeMap, } #[derive(Debug, Default)] struct NativeBatch { // invariant: putters' and getters' polarities respected to_put: HashMap, to_get: HashSet, } #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] enum TokenTarget { NetEndpoint { index: usize }, UdpEndpoint { index: usize }, Waker, } trait RoundCtxTrait { fn get_deadline(&self) -> &Option; fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg); } enum CommRecvOk { TimeoutWithoutNew, NewPayloadMsgs, NewControlMsg { net_index: usize, msg: CommCtrlMsg }, } //////////////// fn would_block(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::WouldBlock } impl TokenTarget { const HALFWAY_INDEX: usize = usize::MAX / 2; const MAX_INDEX: usize = usize::MAX; const WAKER_TOKEN: usize = Self::MAX_INDEX; } impl From for TokenTarget { fn from(Token(index): Token) -> Self { if index == Self::WAKER_TOKEN { TokenTarget::Waker } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { TokenTarget::UdpEndpoint { index: shifted } } else { TokenTarget::NetEndpoint { index } } } } impl Into for TokenTarget { fn into(self) -> Token { match self { TokenTarget::Waker => Token(Self::WAKER_TOKEN), TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), TokenTarget::NetEndpoint { index } => Token(index), } } } impl VecSet { fn new(mut vec: Vec) -> Self { vec.sort(); vec.dedup(); Self { vec } } fn contains(&self, element: &T) -> bool { self.vec.binary_search(element).is_ok() } fn insert(&mut self, element: T) -> bool { match self.vec.binary_search(&element) { Ok(_) => false, Err(index) => { self.vec.insert(index, element); true } } } fn iter(&self) -> std::slice::Iter { self.vec.iter() } fn pop(&mut self) -> Option { self.vec.pop() } } impl PortInfo { fn spec_var_for(&self, port: PortId) -> SpecVar { SpecVar(match self.polarities.get(&port).unwrap() { Getter => port, Putter => *self.peers.get(&port).unwrap(), }) } } impl SpecVarStream { fn next(&mut self) -> SpecVar { let phantom_port: PortId = Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() } .into(); SpecVar(phantom_port) } } impl IdManager { fn new(connector_id: ConnectorId) -> Self { Self { connector_id, port_suffix_stream: Default::default(), proto_component_suffix_stream: Default::default(), } } fn new_spec_var_stream(&self) -> SpecVarStream { SpecVarStream { connector_id: self.connector_id, port_suffix_stream: self.port_suffix_stream.clone(), } } fn new_port_id(&mut self) -> PortId { Id { connector_id: self.connector_id, u32_suffix: self.port_suffix_stream.next() }.into() } fn new_proto_component_id(&mut self) -> ProtoComponentId { Id { connector_id: self.connector_id, u32_suffix: self.proto_component_suffix_stream.next(), } .into() } } impl Drop for Connector { fn drop(&mut self) { log!(&mut *self.unphased.logger, "Connector dropping. Goodbye!"); } } impl Connector { pub(crate) fn random_id() -> ConnectorId { type Bytes8 = [u8; std::mem::size_of::()]; unsafe { let mut bytes = std::mem::MaybeUninit::::uninit(); // getrandom is the canonical crate for a small, secure rng getrandom::getrandom(&mut *bytes.as_mut_ptr()).unwrap(); // safe! representations of all valid Byte8 values are valid ConnectorId values std::mem::transmute::<_, _>(bytes.assume_init()) } } pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { std::mem::swap(&mut self.unphased.logger, &mut new_logger); new_logger } pub fn get_logger(&mut self) -> &mut dyn Logger { &mut *self.unphased.logger } pub fn new_port_pair(&mut self) -> [PortId; 2] { let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native let [o, i] = [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()]; cu.native_ports.insert(o); cu.native_ports.insert(i); // {polarity, peer, route} known. {} unknown. cu.port_info.polarities.insert(o, Putter); cu.port_info.polarities.insert(i, Getter); cu.port_info.peers.insert(o, i); cu.port_info.peers.insert(i, o); let route = Route::LocalComponent(ComponentId::Native); cu.port_info.routes.insert(o, route); cu.port_info.routes.insert(i, route); log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } pub fn add_component( &mut self, identifier: &[u8], ports: &[PortId], ) -> Result<(), AddComponentError> { // called by the USER. moves ports owned by the NATIVE use AddComponentError as Ace; // 1. check if this is OK let cu = &mut self.unphased; let polarities = cu.proto_description.component_polarities(identifier)?; if polarities.len() != ports.len() { return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() }); } for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { if !cu.native_ports.contains(port) { return Err(Ace::UnknownPort(*port)); } if expected_polarity != *cu.port_info.polarities.get(port).unwrap() { return Err(Ace::WrongPortPolarity { port: *port, expected_polarity }); } } // 3. remove ports from old component & update port->route let new_id = cu.id_manager.new_proto_component_id(); for port in ports.iter() { cu.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); } cu.native_ports.retain(|port| !ports.contains(port)); // 4. add new component cu.proto_components.insert( new_id, ProtoComponent { state: cu.proto_description.new_main_component(identifier, ports), ports: ports.iter().copied().collect(), }, ); Ok(()) } } impl Predicate { #[inline] pub fn inserted(mut self, k: SpecVar, v: SpecVal) -> Self { self.assigned.insert(k, v); self } pub fn assigns_subset(&self, maybe_superset: &Self) -> bool { for (var, val) in self.assigned.iter() { match maybe_superset.assigned.get(var) { Some(val2) if val2 == val => {} _ => return false, // var unmapped, or mapped differently } } true } // returns true IFF self.unify would return Equivalent OR FormerNotLatter // pub fn consistent_with(&self, other: &Self) -> bool { // let [larger, smaller] = // if self.assigned.len() > other.assigned.len() { [self, other] } else { [other, self] }; // for (var, val) in smaller.assigned.iter() { // match larger.assigned.get(var) { // Some(val2) if val2 != val => return false, // _ => {} // } // } // true // } /// Given self and other, two predicates, return the predicate whose /// assignments are the union of those of self and other. /// fn assignment_union(&self, other: &Self) -> AssignmentUnionResult { use AssignmentUnionResult as Aur; // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; let [mut s, mut o] = [s_it.next(), o_it.next()]; // lists of assignments in self but not other and vice versa. let [mut s_not_o, mut o_not_s] = [vec![], vec![]]; loop { match [s, o] { [None, None] => break, [None, Some(x)] => { o_not_s.push(x); o_not_s.extend(o_it); break; } [Some(x), None] => { s_not_o.push(x); s_not_o.extend(s_it); break; } [Some((sid, sb)), Some((oid, ob))] => { if sid < oid { // o is missing this element s_not_o.push((sid, sb)); s = s_it.next(); } else if sid > oid { // s is missing this element o_not_s.push((oid, ob)); o = o_it.next(); } else if sb != ob { assert_eq!(sid, oid); // both predicates assign the variable but differ on the value return Aur::Nonexistant; } else { // both predicates assign the variable to the same value s = s_it.next(); o = o_it.next(); } } } } // Observed zero inconsistencies. A unified predicate exists... match [s_not_o.is_empty(), o_not_s.is_empty()] { [true, true] => Aur::Equivalent, // ... equivalent to both. [false, true] => Aur::FormerNotLatter, // ... equivalent to self. [true, false] => Aur::LatterNotFormer, // ... equivalent to other. [false, false] => { // ... which is the union of the predicates' assignments but // is equivalent to neither self nor other. let mut new = self.clone(); for (&id, &b) in o_not_s { new.assigned.insert(id, b); } Aur::New(new) } } } pub fn union_with(&self, other: &Self) -> Option { let mut res = self.clone(); for (&channel_id, &assignment_1) in other.assigned.iter() { match res.assigned.insert(channel_id, assignment_1) { Some(assignment_2) if assignment_1 != assignment_2 => return None, _ => {} } } Some(res) } pub fn query(&self, var: SpecVar) -> Option { self.assigned.get(&var).copied() } } impl Debug for VecSet { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_set().entries(self.vec.iter()).finish() } } impl Debug for Predicate { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_tuple("Predicate").field(&self.assigned).finish() } } impl serde::Serialize for SerdeProtocolDescription { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { let inner: &ProtocolDescription = &self.0; inner.serialize(serializer) } } impl<'de> serde::Deserialize<'de> for SerdeProtocolDescription { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { let inner: ProtocolDescription = ProtocolDescription::deserialize(deserializer)?; Ok(Self(Arc::new(inner))) } } impl Debug for SpecVar { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_tuple("vrID").field(&self.0).finish() } } impl SpecVal { const FIRING: Self = SpecVal(1); const SILENT: Self = SpecVal(0); fn is_firing(self) -> bool { self == Self::FIRING // all else treated as SILENT } fn nth_domain_element(n: usize) -> Self { let n: u16 = n.try_into().unwrap(); SpecVal(n) } fn iter_domain() -> impl Iterator { (0..).map(SpecVal) } } impl Debug for SpecVal { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { self.0.fmt(f) } } impl Default for UdpInBuffer { fn default() -> Self { let mut byte_vec = Vec::with_capacity(Self::CAPACITY); unsafe { // safe! this vector is guaranteed to have sufficient capacity byte_vec.set_len(Self::CAPACITY); } Self { byte_vec } } } impl UdpInBuffer { const CAPACITY: usize = u16::MAX as usize; fn as_mut_slice(&mut self) -> &mut [u8] { self.byte_vec.as_mut_slice() } }