diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 592040558a0a2487e01a595f95fc52ce4fc4c610..6ae66019b5754d73bf9fd9c1e5d992d8304c83c4 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -123,6 +123,10 @@ struct CommMsg { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum CommMsgContents { SendPayload(SendPayloadMsg), + CommCtrl(CommCtrlMsg), +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +enum CommCtrlMsg { Suggest { suggestion: Decision }, // SINKWARD Announce { decision: Decision }, // SINKAWAYS } @@ -176,6 +180,10 @@ struct IdManager { proto_component_suffix_stream: U32Stream, } #[derive(Debug)] +struct UdpInBuffer { + byte_vec: Vec, +} +#[derive(Debug)] struct SpecVarStream { connector_id: ConnectorId, port_suffix_stream: U32Stream, @@ -187,16 +195,24 @@ struct EndpointManager { // 2. Events is empty poll: Poll, events: Events, - polled_undrained: VecSet, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, - net_endpoint_exts: Vec, + net_endpoint_store: EndpointStore, + udp_endpoint_store: EndpointStore, + udp_in_buffer: UdpInBuffer, +} +#[derive(Debug)] +struct EndpointStore { + endpoint_exts: Vec, + polled_undrained: VecSet, } +#[derive(Debug)] struct UdpEndpointExt { - sock: UdpSocket, + sock: UdpSocket, // already bound and connected + outgoing_payloads: HashMap, + incoming_round_spec_var: Option, getter_for_incoming: PortId, - outgoing_buffer: HashMap, - incoming_buffer: Vec, + incoming_payloads: Vec, } #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] struct PortInfo { @@ -247,6 +263,15 @@ enum TokenTarget { UdpEndpoint { index: usize }, Waker, } +trait RoundCtxTrait { + fn get_deadline(&self) -> &Option; + fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg); +} +enum CommRecvOk { + TimeoutWithoutNew, + NewPayloadMsgs, + NewControlMsg { net_endpoint_index: usize, msg: CommCtrlMsg }, +} //////////////// fn would_block(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::WouldBlock @@ -258,7 +283,7 @@ impl TokenTarget { } impl From for TokenTarget { fn from(Token(index): Token) -> Self { - if index == Self::MAX_INDEX { + if index == Self::WAKER_TOKEN { TokenTarget::Waker } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { TokenTarget::UdpEndpoint { index: shifted } @@ -270,7 +295,7 @@ impl From for TokenTarget { impl Into for TokenTarget { fn into(self) -> Token { match self { - TokenTarget::Waker => Token(Self::MAX_INDEX), + TokenTarget::Waker => Token(Self::WAKER_TOKEN), TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), TokenTarget::NetEndpoint { index } => Token(index), } @@ -426,6 +451,17 @@ impl Predicate { self.assigned.insert(k, v); self } + + pub fn assigns_subset(&self, maybe_superset: &Self) -> bool { + for (var, val) in self.assigned.iter() { + match maybe_superset.assigned.get(var) { + Some(val2) if val2 == val => {} + _ => return false, + } + } + true + } + // returns true IFF self.unify would return Equivalent OR FormerNotLatter pub fn consistent_with(&self, other: &Self) -> bool { let [larger, smaller] = @@ -557,6 +593,10 @@ impl SpecVal { self == Self::FIRING // all else treated as SILENT } + fn nth_domain_element(n: usize) -> Self { + let n: u16 = n.try_into().unwrap(); + SpecVal(n) + } fn iter_domain() -> impl Iterator { (0..).map(SpecVal) } @@ -566,3 +606,19 @@ impl Debug for SpecVal { self.0.fmt(f) } } +impl Default for UdpInBuffer { + fn default() -> Self { + let mut byte_vec = Vec::with_capacity(Self::CAPACITY); + unsafe { + // safe! this vector is guaranteed to have sufficient capacity + byte_vec.set_len(Self::CAPACITY); + } + Self { byte_vec } + } +} +impl UdpInBuffer { + const CAPACITY: usize = u16::MAX as usize; + fn as_mut_slice(&mut self) -> &mut [u8] { + self.byte_vec.as_mut_slice() + } +}