diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 801b102b746ad94f712dfb044fdd569bdeece28d..dcbff96cf8c2fe45ed3090bd16d785ea96886f89 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -61,6 +61,14 @@ impl ReplaceBoolTrue for bool { } //////////////// +impl RoundCtxTrait for RoundCtx { + fn get_deadline(&self) -> &Option { + &self.deadline + } + fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { + self.getter_buffer.getter_add(getter, msg) + } +} impl Connector { pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError as Ge; @@ -301,6 +309,8 @@ impl Connector { } // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); + + comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.logger, &mut rctx.spec_var_stream); // Call to another big method; keep running this round until a distributed decision is reached let decision = Self::sync_reach_decision( cu, @@ -310,11 +320,14 @@ impl Connector { &mut rctx, )?; log!(cu.logger, "Committing to decision {:?}!", &decision); + comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger, &decision)?; // propagate the decision to children let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, - contents: CommMsgContents::Announce { decision: decision.clone() }, + contents: CommMsgContents::CommCtrl(CommCtrlMsg::Announce { + decision: decision.clone(), + }), }); log!( cu.logger, @@ -504,7 +517,9 @@ impl Connector { let suggestion = Decision::Success(solution); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, - contents: CommMsgContents::Suggest { suggestion }, + contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { + suggestion, + }), }); comm.endpoint_manager.send_to_comms(parent, &msg)?; } @@ -519,73 +534,38 @@ impl Connector { // try recv messages arriving through endpoints log!(cu.logger, "No decision yet. Let's recv an endpoint msg..."); { - let (endpoint_index, msg) = loop { - match comm - .endpoint_manager - .try_recv_any_comms(&mut *cu.logger, rctx.deadline)? - { - None => { - log!(cu.logger, "Reached user-defined deadling without decision..."); - if let Some(parent) = comm.neighborhood.parent { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? - } else { - log!(cu.logger, "Already requested failure"); - } + let (endpoint_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm + .endpoint_manager + .try_recv_any_comms(&mut *cu.logger, &cu.port_info, rctx, comm.round_index)? + { + CommRecvOk::NewControlMsg { net_endpoint_index, msg } => { + (net_endpoint_index, msg) + } + CommRecvOk::NewPayloadMsgs => continue 'undecided, + CommRecvOk::TimeoutWithoutNew => { + log!(cu.logger, "Reached user-defined deadling without decision..."); + if let Some(parent) = comm.neighborhood.parent { + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "As the leader, deciding on timeout"); - return Ok(Decision::Failure); + log!(cu.logger, "Already requested failure"); } - rctx.deadline = None; + } else { + log!(cu.logger, "As the leader, deciding on timeout"); + return Ok(Decision::Failure); } - Some((endpoint_index, msg)) => break (endpoint_index, msg), - } - }; - log!(cu.logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); - let comm_msg_contents = match msg { - Msg::SetupMsg(..) => { - log!(cu.logger, "Discarding setup message; that phase is over"); + rctx.deadline = None; continue 'undecided; } - Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&comm.round_index) { - Ordering::Equal => comm_msg.contents, - Ordering::Less => { - log!( - cu.logger, - "We are in round {}, but msg is for round {}. Discard", - comm_msg.round_index, - comm.round_index, - ); - drop(comm_msg); - continue 'undecided; - } - Ordering::Greater => { - log!( - cu.logger, - "We are in round {}, but msg is for round {}. Buffer", - comm_msg.round_index, - comm.round_index, - ); - comm.endpoint_manager - .delayed_messages - .push((endpoint_index, Msg::CommMsg(comm_msg))); - continue 'undecided; - } - }, }; - match comm_msg_contents { - CommMsgContents::SendPayload(send_payload_msg) => { - let getter = comm.endpoint_manager.net_endpoint_exts[endpoint_index] - .getter_for_incoming; - assert!(cu.port_info.polarities.get(&getter) == Some(&Getter)); - log!( - cu.logger, - "Msg routed to getter port {:?}. Buffer for recv loop", - getter, - ); - rctx.getter_buffer.getter_add(getter, send_payload_msg); - } - CommMsgContents::Suggest { suggestion } => { + log!( + cu.logger, + "Received from endpoint {} ctrl msg {:?}", + endpoint_index, + &comm_ctrl_msg + ); + match comm_ctrl_msg { + CommCtrlMsg::Suggest { suggestion } => { // only accept this control msg through a child endpoint if comm.neighborhood.children.contains(&endpoint_index) { match suggestion { @@ -626,7 +606,7 @@ impl Connector { ); } } - CommMsgContents::Announce { decision } => { + CommCtrlMsg::Announce { decision } => { if Some(endpoint_index) == comm.neighborhood.parent { // adopt this decision return Ok(decision); @@ -653,7 +633,7 @@ impl Connector { let suggestion = Decision::Failure; let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, - contents: CommMsgContents::Suggest { suggestion }, + contents: CommMsgContents::CommCtrl(CommCtrlMsg::Suggest { suggestion }), }); comm.endpoint_manager.send_to_comms(parent, &msg) } @@ -758,7 +738,7 @@ impl BranchingNative { self.branches.keys() ); for (branch_predicate, branch) in self.branches { - if branch.to_get.is_empty() && solution_predicate.consistent_with(&branch_predicate) { + if branch.to_get.is_empty() && branch_predicate.assigns_subset(solution_predicate) { let NativeBranch { index, gotten, .. } = branch; log!(logger, "Collapsed native has gotten {:?}", &gotten); return RoundOk { batch_index: index, gotten }; @@ -937,7 +917,7 @@ impl BranchingProtoComponent { fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches { - if branch.ended && solution_predicate.consistent_with(&branch_predicate) { + if branch.ended && branch_predicate.assigns_subset(solution_predicate) { let ProtoComponentBranch { state, .. } = branch; return ProtoComponent { state, ports }; } diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index a105ff1777b3e656ab79ee71268f1f204ff707e0..551f2604b1075e9b52f53715c175f3b815d10dc1 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -4,11 +4,13 @@ struct MonitoredReader { bytes: usize, r: R, } -#[derive(Debug)] -enum TryRecyAnyError { - Timeout, +enum PollAndPopulateError { PollFailed, - EndpointError { error: EndpointError, index: usize }, + Timeout, +} +struct TryRecvAnyNetError { + error: NetEndpointError, + index: usize, } ///////////////////// impl NetEndpoint { @@ -18,8 +20,8 @@ impl NetEndpoint { pub(super) fn try_recv( &mut self, logger: &mut dyn Logger, - ) -> Result, EndpointError> { - use EndpointError as Ee; + ) -> Result, NetEndpointError> { + use NetEndpointError as Nee; // populate inbox as much as possible let before_len = self.inbox.len(); 'read_loop: loop { @@ -28,7 +30,7 @@ impl NetEndpoint { Err(e) if would_block(&e) => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), - Err(_e) => return Err(Ee::BrokenEndpoint), + Err(_e) => return Err(Nee::BrokenNetEndpoint), } } endptlog!( @@ -57,14 +59,19 @@ impl NetEndpoint { bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { Ok(None) } - _ => Err(Ee::MalformedMessage), + _ => Err(Nee::MalformedMessage), }, } } - pub(super) fn send(&mut self, msg: &T) -> Result<(), EndpointError> { + pub(super) fn send( + &mut self, + msg: &T, + ) -> Result<(), NetEndpointError> { use bincode::config::Options; - use EndpointError as Ee; - Self::bincode_opts().serialize_into(&mut self.stream, msg).map_err(|_| Ee::BrokenEndpoint) + use NetEndpointError as Nee; + Self::bincode_opts() + .serialize_into(&mut self.stream, msg) + .map_err(|_| Nee::BrokenNetEndpoint) } } @@ -73,7 +80,7 @@ impl EndpointManager { 0..self.num_net_endpoints() } pub(super) fn num_net_endpoints(&self) -> usize { - self.net_endpoint_exts.len() + self.net_endpoint_store.endpoint_exts.len() } pub(super) fn send_to_comms( &mut self, @@ -81,90 +88,247 @@ impl EndpointManager { msg: &Msg, ) -> Result<(), UnrecoverableSyncError> { use UnrecoverableSyncError as Use; - let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint; - net_endpoint.send(msg).map_err(|_| Use::BrokenEndpoint(index)) + let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; + net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index }) } pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { - let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint; + let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; net_endpoint.send(msg).map_err(|err| { - ConnectError::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) + ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) }) } - pub(super) fn try_recv_any_comms( - &mut self, - logger: &mut dyn Logger, - deadline: Option, - ) -> Result, UnrecoverableSyncError> { - use {TryRecyAnyError as Trae, UnrecoverableSyncError as Use}; - match self.try_recv_any(logger, deadline) { - Ok(tup) => Ok(Some(tup)), - Err(Trae::Timeout) => Ok(None), - Err(Trae::PollFailed) => Err(Use::PollFailed), - Err(Trae::EndpointError { error: _, index }) => Err(Use::BrokenEndpoint(index)), - } - } + + /// Receive the first message of any kind at all. + /// Why not return SetupMsg? Because often this message will be forwarded to several others, + /// and by returning a Msg, it can be serialized in-place (NetEndpoints allow the sending of Msg types!) pub(super) fn try_recv_any_setup( &mut self, logger: &mut dyn Logger, - deadline: Option, + deadline: &Option, ) -> Result<(usize, Msg), ConnectError> { - use {ConnectError as Ce, TryRecyAnyError as Trae}; - self.try_recv_any(logger, deadline).map_err(|err| match err { - Trae::Timeout => Ce::Timeout, - Trae::PollFailed => Ce::PollFailed, - Trae::EndpointError { error, index } => Ce::EndpointSetupError( - self.net_endpoint_exts[index].net_endpoint.stream.local_addr().unwrap(), - error, - ), - }) + /////////////////////////////////////////// + fn map_trane( + trane: TryRecvAnyNetError, + net_endpoint_store: &EndpointStore, + ) -> ConnectError { + ConnectError::NetEndpointSetupError( + net_endpoint_store.endpoint_exts[trane.index] + .net_endpoint + .stream + .local_addr() + .unwrap(), // stream must already be connected + trane.error, + ) + } + /////////////////////////////////////////// + // try yield undelayed net message + if let Some(tup) = self.undelayed_messages.pop() { + endptlog!(logger, "RECV undelayed_msg {:?}", &tup); + return Ok(tup); + } + loop { + // try recv from some polled undrained NET endpoint + if let Some(tup) = self + .try_recv_undrained_net(logger) + .map_err(|trane| map_trane(trane, &self.net_endpoint_store))? + { + return Ok(tup); + } + // poll if time remains + self.poll_and_polulate(logger, deadline)?; + } } - fn try_recv_any( + + // drops all Setup messages, + // buffers all future round messages, + // drops all previous round messages, + // enqueues all current round SendPayload messages using round_ctx.getter_add + // returns the first comm_ctrl_msg encountered + // only polls until SOME message is enqueued + pub(super) fn try_recv_any_comms( &mut self, logger: &mut dyn Logger, - deadline: Option, - ) -> Result<(usize, Msg), TryRecyAnyError> { - use TryRecyAnyError as Trea; - // 1. try messages already buffered - if let Some(x) = self.undelayed_messages.pop() { - endptlog!(logger, "RECV undelayed_msg {:?}", &x); - return Ok(x); - } - loop { - // 2. try read a message from an endpoint that raised an event with poll() but wasn't drained - while let Some(index) = self.polled_undrained.pop() { - let net_endpoint = &mut self.net_endpoint_exts[index].net_endpoint; - if let Some(msg) = net_endpoint - .try_recv(logger) - .map_err(|error| Trea::EndpointError { error, index })? - { - endptlog!(logger, "RECV polled_undrained {:?}", &msg); - if !net_endpoint.inbox.is_empty() { - // there may be another message waiting! - self.polled_undrained.insert(index); + port_info: &PortInfo, + round_ctx: &mut impl RoundCtxTrait, + round_index: usize, + ) -> Result { + /////////////////////////////////////////// + impl EndpointManager { + fn handle_msg( + &mut self, + logger: &mut dyn Logger, + round_ctx: &mut impl RoundCtxTrait, + net_endpoint_index: usize, + msg: Msg, + round_index: usize, + some_message_enqueued: &mut bool, + ) -> Option<(usize, CommCtrlMsg)> { + let comm_msg_contents = match msg { + Msg::SetupMsg(..) => return None, + Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) { + Ordering::Equal => comm_msg.contents, + Ordering::Less => { + log!( + logger, + "We are in round {}, but msg is for round {}. Discard", + comm_msg.round_index, + round_index, + ); + return None; + } + Ordering::Greater => { + log!( + logger, + "We are in round {}, but msg is for round {}. Buffer", + comm_msg.round_index, + round_index, + ); + self.delayed_messages + .push((net_endpoint_index, Msg::CommMsg(comm_msg))); + return None; + } + }, + }; + match comm_msg_contents { + CommMsgContents::CommCtrl(comm_ctrl_msg) => { + Some((net_endpoint_index, comm_ctrl_msg)) + } + CommMsgContents::SendPayload(send_payload_msg) => { + let getter = self.net_endpoint_store.endpoint_exts[net_endpoint_index] + .getter_for_incoming; + round_ctx.getter_add(getter, send_payload_msg); + *some_message_enqueued = true; + None } - return Ok((index, msg)); } } - // 3. No message yet. Do we have enough time to poll? - let remaining = if let Some(deadline) = deadline { - Some(deadline.checked_duration_since(Instant::now()).ok_or(Trea::Timeout)?) - } else { - None - }; - self.poll.poll(&mut self.events, remaining).map_err(|_| Trea::PollFailed)?; - for event in self.events.iter() { - let Token(index) = event.token(); - self.polled_undrained.insert(index); - endptlog!( + } + use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use}; + /////////////////////////////////////////// + let mut some_message_enqueued = false; + // try yield undelayed net message + while let Some((net_endpoint_index, msg)) = self.undelayed_messages.pop() { + if let Some((net_endpoint_index, msg)) = self.handle_msg( + logger, + round_ctx, + net_endpoint_index, + msg, + round_index, + &mut some_message_enqueued, + ) { + return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg }); + } + } + loop { + // try receive a net message + while let Some((net_endpoint_index, msg)) = self.try_recv_undrained_net(logger)? { + if let Some((net_endpoint_index, msg)) = self.handle_msg( logger, - "RECV poll event {:?} for endpoint index {:?}. undrained: {:?}", - &event, - index, - self.polled_undrained.iter() - ); + round_ctx, + net_endpoint_index, + msg, + round_index, + &mut some_message_enqueued, + ) { + return Ok(CommRecvOk::NewControlMsg { net_endpoint_index, msg }); + } + } + // try receive a udp message + let recv_buffer = self.udp_in_buffer.as_mut_slice(); + while let Some(index) = self.udp_endpoint_store.polled_undrained.pop() { + let ee = &mut self.udp_endpoint_store.endpoint_exts[index]; + if let Some(bytes_written) = ee.sock.recv(recv_buffer).ok() { + // I received a payload! + self.udp_endpoint_store.polled_undrained.insert(index); + let payload = Payload::from(&recv_buffer[..bytes_written]); + let [branch_spec_var, port_spec_var] = [ + ee.incoming_round_spec_var.unwrap(), // should not be NONE + port_info.spec_var_for(ee.getter_for_incoming), + ]; + let branch_spec_val = SpecVal::nth_domain_element(ee.incoming_payloads.len()); + ee.incoming_payloads.push(payload.clone()); + let predicate = Predicate::default() + .inserted(branch_spec_var, branch_spec_val) + .inserted(port_spec_var, SpecVal::FIRING); + round_ctx + .getter_add(ee.getter_for_incoming, SendPayloadMsg { payload, predicate }); + some_message_enqueued = true; + } + } + if some_message_enqueued { + return Ok(CommRecvOk::NewPayloadMsgs); + } + // poll if time remains + match self.poll_and_polulate(logger, round_ctx.get_deadline()) { + Ok(()) => {} // continue looping + Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew), + Err(Pape::PollFailed) => return Err(Use::PollFailed), + } + } + } + fn try_recv_undrained_net( + &mut self, + logger: &mut dyn Logger, + ) -> Result, TryRecvAnyNetError> { + while let Some(index) = self.net_endpoint_store.polled_undrained.pop() { + let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; + if let Some(msg) = net_endpoint + .try_recv(logger) + .map_err(|error| TryRecvAnyNetError { error, index })? + { + endptlog!(logger, "RECV polled_undrained {:?}", &msg); + if !net_endpoint.inbox.is_empty() { + // there may be another message waiting! + self.net_endpoint_store.polled_undrained.insert(index); + } + return Ok(Some((index, msg))); } - self.events.clear(); } + Ok(None) + } + fn poll_and_polulate( + &mut self, + logger: &mut dyn Logger, + deadline: &Option, + ) -> Result<(), PollAndPopulateError> { + use PollAndPopulateError as Pape; + // No message yet. Do we have enough time to poll? + let remaining = if let Some(deadline) = deadline { + Some(deadline.checked_duration_since(Instant::now()).ok_or(Pape::Timeout)?) + } else { + None + }; + // Yes we do! Poll with remaining time as poll deadline + self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?; + for event in self.events.iter() { + match TokenTarget::from(event.token()) { + TokenTarget::Waker => { + // Can ignore. Residual event from endpoint manager setup procedure + } + TokenTarget::NetEndpoint { index } => { + self.net_endpoint_store.polled_undrained.insert(index); + endptlog!( + logger, + "RECV poll event {:?} for NET endpoint index {:?}. undrained: {:?}", + &event, + index, + self.net_endpoint_store.polled_undrained.iter() + ); + } + TokenTarget::UdpEndpoint { index } => { + self.udp_endpoint_store.polled_undrained.insert(index); + endptlog!( + logger, + "RECV poll event {:?} for UDP endpoint index {:?}. undrained: {:?}", + &event, + index, + self.udp_endpoint_store.polled_undrained.iter() + ); + } + } + } + self.events.clear(); + Ok(()) } pub(super) fn undelay_all(&mut self) { if self.undelayed_messages.is_empty() { @@ -175,7 +339,79 @@ impl EndpointManager { // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } + pub(super) fn udp_endpoints_round_start( + &mut self, + logger: &mut dyn Logger, + spec_var_stream: &mut SpecVarStream, + ) { + log!( + logger, + "Starting round for {} udp endpoints", + self.udp_endpoint_store.endpoint_exts.len() + ); + for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { + let spec_var = spec_var_stream.next(); + log!(logger, "Udp endpoint given {} spec var {:?} for this round", index, spec_var); + ee.incoming_round_spec_var = Some(spec_var); + } + } + pub(super) fn udp_endpoints_round_end( + &mut self, + logger: &mut dyn Logger, + decision: &Decision, + ) -> Result<(), UnrecoverableSyncError> { + log!( + logger, + "Ending round for {} udp endpoints", + self.udp_endpoint_store.endpoint_exts.len() + ); + use UnrecoverableSyncError as Use; + if let Decision::Success(solution_predicate) = decision { + 'endpoint_loop: for (index, ee) in + self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() + { + ee.incoming_round_spec_var = None; // shouldn't be accessed before its overwritten next round; still adding for clarity. + for (payload_predicate, payload) in ee.outgoing_payloads.drain() { + if payload_predicate.assigns_subset(solution_predicate) { + ee.sock + .send(payload.as_slice()) + .map_err(|_| Use::BrokenNetEndpoint { index })?; + log!( + logger, + "Sent payload {:?} with pred {:?} through Udp endpoint {}", + &payload, + &payload_predicate, + index + ); + continue 'endpoint_loop; // send at most one payload per endpoint per round + } + } + log!(logger, "Sent no message through Udp endpoint {}", index); + } + } + Ok(()) + } } +// impl UdpEndpointExt { +// fn try_recv( +// &mut self, +// port_info: &PortInfo, +// udp_in_buffer: &mut UdpInBuffer, +// ) -> Option { +// let recv_buffer = udp_in_buffer.as_mut_slice(); +// let len = self.sock.recv(recv_buffer).ok()?; +// let payload = Payload::from(&recv_buffer[..len]); +// let branch_spec_var = self +// .incoming_round_spec_var +// .expect("Udp spec var should be Some(..) if recv() is called"); +// let branch_spec_val = SpecVal::nth_domain_element(self.incoming_payloads.len()); +// self.incoming_payloads.push(payload.clone()); +// let predicate = Predicate::default() +// .inserted(branch_spec_var, branch_spec_val) +// .inserted(port_info.spec_var_for(self.getter_for_incoming), SpecVal::FIRING); +// Some(SendPayloadMsg { payload, predicate }) +// } +// } impl Debug for NetEndpoint { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("Endpoint").field("inbox", &self.inbox).finish() @@ -203,3 +439,18 @@ impl Into for SetupMsg { Msg::SetupMsg(self) } } +impl From for ConnectError { + fn from(pape: PollAndPopulateError) -> ConnectError { + use {ConnectError as Ce, PollAndPopulateError as Pape}; + match pape { + Pape::PollFailed => Ce::PollFailed, + Pape::Timeout => Ce::Timeout, + } + } +} +impl From for UnrecoverableSyncError { + fn from(trane: TryRecvAnyNetError) -> UnrecoverableSyncError { + let TryRecvAnyNetError { index, .. } = trane; + UnrecoverableSyncError::BrokenNetEndpoint { index } + } +} diff --git a/src/runtime/error.rs b/src/runtime/error.rs index e51e4020b14139b3b0c154b175767b928cb75190..c21fa7b9801fbbb80fa7542541210a70c5fa3a2f 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -9,7 +9,7 @@ pub enum ConnectError { AcceptFailed(SocketAddr), AlreadyConnected, PortPeerPolarityMismatch(PortId), - EndpointSetupError(SocketAddr, EndpointError), + NetEndpointSetupError(SocketAddr, NetEndpointError), SetupAlgMisbehavior, } #[derive(Eq, PartialEq, Copy, Clone, Debug)] @@ -26,7 +26,8 @@ pub enum AddComponentError { #[derive(Debug, Clone)] pub enum UnrecoverableSyncError { PollFailed, - BrokenEndpoint(usize), + BrokenNetEndpoint { index: usize }, + BrokenUdpEndpoint { index: usize }, MalformedStateError(MalformedStateError), } #[derive(Debug, Clone)] @@ -42,9 +43,9 @@ pub enum MalformedStateError { GetterUnknownFor { putter: PortId }, } #[derive(Debug, Clone)] -pub enum EndpointError { +pub enum NetEndpointError { MalformedMessage, - BrokenEndpoint, + BrokenNetEndpoint, } #[derive(Debug)] pub enum PortOpError { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 592040558a0a2487e01a595f95fc52ce4fc4c610..6ae66019b5754d73bf9fd9c1e5d992d8304c83c4 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -123,6 +123,10 @@ struct CommMsg { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] enum CommMsgContents { SendPayload(SendPayloadMsg), + CommCtrl(CommCtrlMsg), +} +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +enum CommCtrlMsg { Suggest { suggestion: Decision }, // SINKWARD Announce { decision: Decision }, // SINKAWAYS } @@ -176,6 +180,10 @@ struct IdManager { proto_component_suffix_stream: U32Stream, } #[derive(Debug)] +struct UdpInBuffer { + byte_vec: Vec, +} +#[derive(Debug)] struct SpecVarStream { connector_id: ConnectorId, port_suffix_stream: U32Stream, @@ -187,16 +195,24 @@ struct EndpointManager { // 2. Events is empty poll: Poll, events: Events, - polled_undrained: VecSet, delayed_messages: Vec<(usize, Msg)>, undelayed_messages: Vec<(usize, Msg)>, - net_endpoint_exts: Vec, + net_endpoint_store: EndpointStore, + udp_endpoint_store: EndpointStore, + udp_in_buffer: UdpInBuffer, +} +#[derive(Debug)] +struct EndpointStore { + endpoint_exts: Vec, + polled_undrained: VecSet, } +#[derive(Debug)] struct UdpEndpointExt { - sock: UdpSocket, + sock: UdpSocket, // already bound and connected + outgoing_payloads: HashMap, + incoming_round_spec_var: Option, getter_for_incoming: PortId, - outgoing_buffer: HashMap, - incoming_buffer: Vec, + incoming_payloads: Vec, } #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] struct PortInfo { @@ -247,6 +263,15 @@ enum TokenTarget { UdpEndpoint { index: usize }, Waker, } +trait RoundCtxTrait { + fn get_deadline(&self) -> &Option; + fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg); +} +enum CommRecvOk { + TimeoutWithoutNew, + NewPayloadMsgs, + NewControlMsg { net_endpoint_index: usize, msg: CommCtrlMsg }, +} //////////////// fn would_block(err: &std::io::Error) -> bool { err.kind() == std::io::ErrorKind::WouldBlock @@ -258,7 +283,7 @@ impl TokenTarget { } impl From for TokenTarget { fn from(Token(index): Token) -> Self { - if index == Self::MAX_INDEX { + if index == Self::WAKER_TOKEN { TokenTarget::Waker } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { TokenTarget::UdpEndpoint { index: shifted } @@ -270,7 +295,7 @@ impl From for TokenTarget { impl Into for TokenTarget { fn into(self) -> Token { match self { - TokenTarget::Waker => Token(Self::MAX_INDEX), + TokenTarget::Waker => Token(Self::WAKER_TOKEN), TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), TokenTarget::NetEndpoint { index } => Token(index), } @@ -426,6 +451,17 @@ impl Predicate { self.assigned.insert(k, v); self } + + pub fn assigns_subset(&self, maybe_superset: &Self) -> bool { + for (var, val) in self.assigned.iter() { + match maybe_superset.assigned.get(var) { + Some(val2) if val2 == val => {} + _ => return false, + } + } + true + } + // returns true IFF self.unify would return Equivalent OR FormerNotLatter pub fn consistent_with(&self, other: &Self) -> bool { let [larger, smaller] = @@ -557,6 +593,10 @@ impl SpecVal { self == Self::FIRING // all else treated as SILENT } + fn nth_domain_element(n: usize) -> Self { + let n: u16 = n.try_into().unwrap(); + SpecVal(n) + } fn iter_domain() -> impl Iterator { (0..).map(SpecVal) } @@ -566,3 +606,19 @@ impl Debug for SpecVal { self.0.fmt(f) } } +impl Default for UdpInBuffer { + fn default() -> Self { + let mut byte_vec = Vec::with_capacity(Self::CAPACITY); + unsafe { + // safe! this vector is guaranteed to have sufficient capacity + byte_vec.set_len(Self::CAPACITY); + } + Self { byte_vec } + } +} +impl UdpInBuffer { + const CAPACITY: usize = u16::MAX as usize; + fn as_mut_slice(&mut self) -> &mut [u8] { + self.byte_vec.as_mut_slice() + } +} diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 872fa77ef59d5ab5c5c4f3fc5a86d48d60e677a6..32ea1ac6d99fd1954189264e22daf4605ce0545c 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -27,15 +27,27 @@ impl Connector { } pub fn new_udp_port( &mut self, + polarity: Polarity, local_addr: SocketAddr, peer_addr: SocketAddr, - ) -> Result<[PortId; 2], WrongStateError> { - let Self { unphased: _up, phased } = self; + ) -> Result { + let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), - ConnectorPhased::Setup(_setup) => { - let _udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr }; - todo!() + ConnectorPhased::Setup(setup) => { + let udp_endpoint_setup = UdpEndpointSetup { local_addr, peer_addr }; + let udp_index = setup.udp_endpoint_setups.len(); + let [port_nat, port_udp] = + [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()]; + cu.native_ports.insert(port_nat); + cu.port_info.peers.insert(port_nat, port_udp); + cu.port_info.peers.insert(port_udp, port_nat); + cu.port_info.routes.insert(port_nat, Route::LocalComponent(ComponentId::Native)); + cu.port_info.routes.insert(port_udp, Route::UdpEndpoint { index: udp_index }); + cu.port_info.polarities.insert(port_nat, polarity); + cu.port_info.polarities.insert(port_udp, !polarity); + setup.udp_endpoint_setups.push((port_nat, udp_endpoint_setup)); + Ok(port_nat) } } } @@ -45,18 +57,18 @@ impl Connector { sock_addr: SocketAddr, endpoint_polarity: EndpointPolarity, ) -> Result { - let Self { unphased: up, phased } = self; + let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { let endpoint_setup = NetEndpointSetup { sock_addr, endpoint_polarity }; - let p = up.id_manager.new_port_id(); - up.native_ports.insert(p); + let p = cu.id_manager.new_port_id(); + cu.native_ports.insert(p); // {polarity, route} known. {peer} unknown. - up.port_info.polarities.insert(p, polarity); - up.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native)); + cu.port_info.polarities.insert(p, polarity); + cu.port_info.routes.insert(p, Route::LocalComponent(ComponentId::Native)); log!( - up.logger, + cu.logger, "Added net port {:?} with polarity {:?} and endpoint setup {:?} ", p, polarity, @@ -83,19 +95,19 @@ impl Connector { &mut *cu.logger, &setup.net_endpoint_setups, &mut cu.port_info, - deadline, + &deadline, )?; log!( cu.logger, "Successfully connected {} endpoints", - endpoint_manager.net_endpoint_exts.len() + endpoint_manager.net_endpoint_store.endpoint_exts.len() ); // leader election and tree construction let neighborhood = init_neighborhood( cu.id_manager.connector_id, &mut *cu.logger, &mut endpoint_manager, - deadline, + &deadline, )?; log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); let mut comm = ConnectorCommunication { @@ -106,7 +118,7 @@ impl Connector { round_result: Ok(None), }; if cfg!(feature = "session_optimization") { - session_optimize(cu, &mut comm, deadline)?; + session_optimize(cu, &mut comm, &deadline)?; } log!(cu.logger, "connect() finished. setup phase complete"); self.phased = ConnectorPhased::Communication(Box::new(comm)); @@ -119,7 +131,7 @@ fn new_endpoint_manager( logger: &mut dyn Logger, endpoint_setups: &[(PortId, NetEndpointSetup)], port_info: &mut PortInfo, - deadline: Option, + deadline: &Option, ) -> Result { //////////////////////////////////////////// use std::sync::atomic::AtomicBool; @@ -173,7 +185,8 @@ fn new_endpoint_manager( let mut waker_state: Option> = None; let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4); - let mut polled_undrained = VecSet::default(); + let mut net_polled_undrained = VecSet::default(); + let udp_polled_undrained = VecSet::default(); let mut delayed_messages = vec![]; // 2. create a registered (TcpListener/Endpoint) for passive / active respectively @@ -334,7 +347,7 @@ fn new_endpoint_manager( net_endpoint .send(&msg) .map_err(|e| { - Ce::EndpointSetupError( + Ce::NetEndpointSetupError( net_endpoint.stream.local_addr().unwrap(), e, ) @@ -346,10 +359,13 @@ fn new_endpoint_manager( if event.is_readable() && todo.recv_peer_port.is_none() { // can read and didn't recv setup msg yet? Do so! let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| { - Ce::EndpointSetupError(net_endpoint.stream.local_addr().unwrap(), e) + Ce::NetEndpointSetupError( + net_endpoint.stream.local_addr().unwrap(), + e, + ) })?; if maybe_msg.is_some() && !net_endpoint.inbox.is_empty() { - polled_undrained.insert(index); + net_polled_undrained.insert(index); } match maybe_msg { None => {} // msg deserialization incomplete @@ -407,7 +423,14 @@ fn new_endpoint_manager( } events.clear(); } - // all todos must be the NetEndpoint variants! unwrap and collect them + log!(logger, "Endpoint setup complete! Cleaning up and building structures"); + if let Some(arc) = waker_state { + log!(logger, "Sending waker the stop signal"); + arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst); + // TODO leave the waker registered? + } + let udp_endpoint_exts = vec![]; + let net_endpoint_exts = todos .into_iter() .enumerate() @@ -425,18 +448,20 @@ fn new_endpoint_manager( getter_for_incoming: local_port, }) .collect(); - if let Some(arc) = waker_state { - log!(logger, "Sending waker the stop signal"); - arc.continue_signal.store(false, std::sync::atomic::Ordering::SeqCst); - // TODO leave the waker registered? - } Ok(EndpointManager { poll, events, - polled_undrained, undelayed_messages: delayed_messages, // no longer delayed delayed_messages: Default::default(), - net_endpoint_exts, + net_endpoint_store: EndpointStore { + endpoint_exts: net_endpoint_exts, + polled_undrained: net_polled_undrained, + }, + udp_endpoint_store: EndpointStore { + endpoint_exts: udp_endpoint_exts, + polled_undrained: udp_polled_undrained, + }, + udp_in_buffer: Default::default(), }) } @@ -444,7 +469,7 @@ fn init_neighborhood( connector_id: ConnectorId, logger: &mut dyn Logger, em: &mut EndpointManager, - deadline: Option, + deadline: &Option, ) -> Result { //////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; @@ -645,7 +670,7 @@ fn init_neighborhood( fn session_optimize( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, - deadline: Option, + deadline: &Option, ) -> Result<(), ConnectError> { //////////////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; @@ -709,7 +734,8 @@ fn session_optimize( serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm .endpoint_manager - .net_endpoint_exts + .net_endpoint_store + .endpoint_exts .iter() .map(|ee| ee.getter_for_incoming) .collect(), @@ -798,8 +824,12 @@ fn apply_optimizations( cu.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; - for (ee, getter) in - comm.endpoint_manager.net_endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter) + for (ee, getter) in comm + .endpoint_manager + .net_endpoint_store + .endpoint_exts + .iter_mut() + .zip(endpoint_incoming_to_getter) { ee.getter_for_incoming = getter; } diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index 40558cf2521a8f05239f3857864bc5f116a32709..731c9c83b9b04b5889bf040029c6aa04391ceaaa 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -657,3 +657,13 @@ fn multi_recover() { }) .unwrap(); } + +#[test] +fn udp_self_connect() { + let test_log_path = Path::new("./logs/udp_self_connect"); + let sock_addrs = [next_test_addr(), next_test_addr()]; + let mut c = file_logged_connector(0, test_log_path); + c.new_udp_port(Putter, sock_addrs[0], sock_addrs[1]).unwrap(); + c.new_udp_port(Getter, sock_addrs[1], sock_addrs[0]).unwrap(); + c.connect(SEC1).unwrap(); +}