diff --git a/src/common.rs b/src/common.rs index 3112933de493280ae1d39235bbbcb743cfbd8341..b282961aa8feb99f73617b4dcc8123d7d0e5bad9 100644 --- a/src/common.rs +++ b/src/common.rs @@ -100,8 +100,8 @@ pub(crate) enum SyncBlocker { PutMsg(PortId, Payload), NondetChoice { n: u16 }, } -struct DenseDebugHex<'a>(pub &'a [u8]); -struct DebuggableIter + Clone, T: Debug>(pub(crate) I); +pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]); +pub(crate) struct DebuggableIter + Clone, T: Debug>(pub(crate) I); ///////////////////// IMPL ///////////////////// impl IdParts for Id { fn id_parts(self) -> (ConnectorId, U32Suffix) { diff --git a/src/macros.rs b/src/macros.rs index efe5ce62c31163a0cef6c8340f05e14de173f927..d47dab2c43b1eca8cf7f72b6b08c3d773d5903ef 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -14,7 +14,9 @@ macro_rules! log { // } }}; (@ENDPT, $logger:expr, $($arg:tt)*) => {{ - // ignore + // if let Some(w) = $logger.line_writer() { + // let _ = writeln!(w, $($arg)*); + // } }}; ($logger:expr, $($arg:tt)*) => {{ // if let Some(w) = $logger.line_writer() { diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index f68f736670de9d4e1aa4d94d95ecf704878b1fa0..cc67e9f097d61e9c24ed3638b726d4773a308ad9 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -152,7 +152,7 @@ impl Connector { ) -> Result<&mut NativeBatch, PortOpError> { use PortOpError as Poe; let Self { unphased: cu, phased } = self; - let info = cu.ips.port_info.get(&port).ok_or(Poe::UnknownPolarity)?; + let info = cu.ips.port_info.map.get(&port).ok_or(Poe::UnknownPolarity)?; if info.owner != cu.native_component_id { return Err(Poe::PortUnavailable); } @@ -376,18 +376,18 @@ impl Connector { ); let firing_ports: HashSet = firing_iter.clone().collect(); for port in firing_iter { - let var = cu.ips.spec_var_for(port); + let var = cu.ips.port_info.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } // all silent ports have SpecVal::SILENT - for port in cu.ips.ports_owned_by(cu.native_component_id) { + for port in cu.ips.port_info.ports_owned_by(cu.native_component_id) { if firing_ports.contains(port) { // this one is FIRING continue; } - let var = cu.ips.spec_var_for(*port); + let var = cu.ips.port_info.spec_var_for(*port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { - log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); + log!(&mut *cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; } } @@ -406,7 +406,7 @@ impl Connector { putter ); // sanity check - assert_eq!(Putter, cu.ips.port_info.get(&putter).unwrap().polarity); + assert_eq!(Putter, cu.ips.port_info.map.get(&putter).unwrap().polarity); rctx.putter_push(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; @@ -580,7 +580,7 @@ impl Connector { log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len()); while let Some((getter, send_payload_msg)) = rctx.getter_pop() { log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg); - let getter_info = rctx.ips.port_info.get(&getter).unwrap(); + let getter_info = rctx.ips.port_info.map.get(&getter).unwrap(); let cid = getter_info.owner; // the id of the component owning `getter` port assert_eq!(Getter, getter_info.polarity); // sanity check log!( @@ -832,7 +832,7 @@ impl BranchingNative { bn_temp: MapTempGuard<'_, Predicate, NativeBranch>, ) { log!(cu.logger(), "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert_eq!(Getter, rctx.ips.port_info.get(&getter).unwrap().polarity); + assert_eq!(Getter, rctx.ips.port_info.map.get(&getter).unwrap().polarity); let mut draining = bn_temp; let finished = &mut self.branches; std::mem::swap(draining.0, finished); @@ -840,7 +840,7 @@ impl BranchingNative { // consistent with that of the received message. for (predicate, mut branch) in draining.drain() { log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate); - let var = rctx.ips.spec_var_for(getter); + let var = rctx.ips.port_info.spec_var_for(getter); if predicate.query(var) != Some(SpecVal::FIRING) { // optimization. Don't bother trying this branch, // because the resulting branch would have an inconsistent predicate. @@ -1046,7 +1046,7 @@ impl BranchingProtoComponent { } B::CouldntCheckFiring(port) => { // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned - let var = rctx.ips.spec_var_for(port); + let var = rctx.ips.port_info.spec_var_for(port); assert!(predicate.query(var).is_none()); // speculate on the two possible values of `var`. Schedule both branches to be rerun. drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); @@ -1054,9 +1054,9 @@ impl BranchingProtoComponent { } B::PutMsg(putter, payload) => { // sanity check: The given port indeed has `Putter` polarity - assert_eq!(Putter, rctx.ips.port_info.get(&putter).unwrap().polarity); + assert_eq!(Putter, rctx.ips.port_info.map.get(&putter).unwrap().polarity); // assign FIRING to this port's associated firing variable - let var = rctx.ips.spec_var_for(putter); + let var = rctx.ips.port_info.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { // Discard the branch, as it clearly has contradictory requirements for this value. @@ -1079,8 +1079,8 @@ impl BranchingProtoComponent { B::SyncBlockEnd => { // This branch reached the end of it's synchronous block // assign all variables of owned ports that DIDN'T fire to SILENT - for port in rctx.ips.ports_owned_by(proto_component_id) { - let var = rctx.ips.spec_var_for(*port); + for port in rctx.ips.port_info.ports_owned_by(proto_component_id) { + let var = rctx.ips.port_info.spec_var_for(*port); let actually_exchanged = branch.inner.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let speculated_to_fire = val == SpecVal::FIRING; @@ -1195,7 +1195,7 @@ impl BranchingProtoComponent { BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; // swap the blocked branches back std::mem::swap(blocked.0, &mut self.branches); - log!(cu.logger(), "component settles down with branches: {:?}", branches.keys()); + log!(cu.logger(), "component settles down with branches: {:?}", self.branches.keys()); Ok(()) } @@ -1362,7 +1362,7 @@ impl NonsyncProtoContext<'_> { for port in moved_ports.iter() { assert_eq!( self.proto_component_id, - self.ips.port_info.get(port).unwrap().owner + self.ips.port_info.map.get(port).unwrap().owner ); } // Create the new component, and schedule it to be run @@ -1378,7 +1378,7 @@ impl NonsyncProtoContext<'_> { self.unrun_components.push((new_cid, state)); // Update the ownership of the moved ports for port in moved_ports.iter() { - self.ips.port_info.get_mut(port).unwrap().owner = new_cid; + self.ips.port_info.map.get_mut(port).unwrap().owner = new_cid; } } @@ -1388,7 +1388,7 @@ impl NonsyncProtoContext<'_> { // adds two new associated ports, related to each other, and exposed to the proto component let mut new_cid_fn = || self.ips.id_manager.new_port_id(); let [o, i] = [new_cid_fn(), new_cid_fn()]; - self.ips.port_info.insert( + self.ips.port_info.map.insert( o, PortInfo { route: Route::LocalComponent, @@ -1397,7 +1397,7 @@ impl NonsyncProtoContext<'_> { owner: self.proto_component_id, }, ); - self.ips.port_info.insert( + self.ips.port_info.map.insert( i, PortInfo { route: Route::LocalComponent, @@ -1420,7 +1420,7 @@ impl SyncProtoContext<'_> { // The component calls the runtime back, inspecting whether it's associated // preidcate has already determined a (speculative) value for the given port's firing variable. pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.rctx.ips.spec_var_for(port); + let var = self.rctx.ips.port_info.spec_var_for(port); self.predicate.query(var).map(SpecVal::is_firing) } diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index ce5dfa05290a337d006c69856dad3f5b27c9834f..ba6f6ca089cdc24c5fa84e7e6e8c585797294c37 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -1,28 +1,39 @@ use super::*; +// A wrapper for some Read type, delegating read calls +// to the contained Read structure, but snooping on +// the number of bytes it reads, to be inspected later. struct MonitoredReader { bytes: usize, r: R, } + enum PollAndPopulateError { PollFailed, Timeout, } + struct TryRecvAnyNetError { error: NetEndpointError, index: usize, } ///////////////////// impl NetEndpoint { + // Returns the bincode configuration the NetEndpoint uses pervasively + // for configuration on ser/de operations. fn bincode_opts() -> impl bincode::config::Options { + // uses variable-length encoding everywhere; great! bincode::config::DefaultOptions::default() } + + // Attempt to return some deserializable T-type from + // the inbox or network stream pub(super) fn try_recv( &mut self, logger: &mut dyn Logger, ) -> Result, NetEndpointError> { use NetEndpointError as Nee; - // populate inbox as much as possible + // populate inbox with bytes as much as possible (mio::TcpStream is nonblocking) let before_len = self.inbox.len(); 'read_loop: loop { let res = self.stream.read_to_end(&mut self.inbox); @@ -40,12 +51,16 @@ impl NetEndpoint { DenseDebugHex(&self.inbox[..before_len]), DenseDebugHex(&self.inbox[before_len..]), ); + // Try deserialize from the inbox, monitoring how many bytes + // the deserialiation process consumes. In the event of + // success, this makes clear where the message ends let mut monitored = MonitoredReader::from(&self.inbox[..]); use bincode::config::Options; match Self::bincode_opts().deserialize_from(&mut monitored) { Ok(msg) => { let msg_size = monitored.bytes_read(); - self.inbox.drain(0..(msg_size.try_into().unwrap())); + // inbox[..msg_size] was deserialized into one message! + self.inbox.drain(..msg_size); log!( @ENDPT, logger, @@ -59,12 +74,15 @@ impl NetEndpoint { } Err(e) => match *e { bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { + // Contents of inbox insufficient for deserializing a message Ok(None) } _ => Err(Nee::MalformedMessage), }, } } + + // Send the given serializable type into the stream pub(super) fn send( &mut self, msg: &T, @@ -86,6 +104,18 @@ impl EndpointManager { pub(super) fn num_net_endpoints(&self) -> usize { self.net_endpoint_store.endpoint_exts.len() } + + // Setup-phase particular send procedure. + // Used pervasively, allows for some brevity with the ? operator. + pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { + let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; + net_endpoint.send(msg).map_err(|err| { + ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) + }) + } + + // Communication-phase particular send procedure. + // Used pervasively, allows for some brevity with the ? operator. pub(super) fn send_to_comms( &mut self, index: usize, @@ -95,12 +125,6 @@ impl EndpointManager { let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; net_endpoint.send(msg).map_err(|_| Use::BrokenNetEndpoint { index }) } - pub(super) fn send_to_setup(&mut self, index: usize, msg: &Msg) -> Result<(), ConnectError> { - let net_endpoint = &mut self.net_endpoint_store.endpoint_exts[index].net_endpoint; - net_endpoint.send(msg).map_err(|err| { - ConnectError::NetEndpointSetupError(net_endpoint.stream.local_addr().unwrap(), err) - }) - } /// Receive the first message of any kind at all. /// Why not return SetupMsg? Because often this message will be forwarded to several others, @@ -110,7 +134,8 @@ impl EndpointManager { logger: &mut dyn Logger, deadline: &Option, ) -> Result<(usize, Msg), ConnectError> { - /////////////////////////////////////////// + // helper function, mapping a TryRecvAnySetup type error + // into a ConnectError fn map_trane( trane: TryRecvAnyNetError, net_endpoint_store: &EndpointStore, @@ -124,7 +149,6 @@ impl EndpointManager { trane.error, ) } - /////////////////////////////////////////// // try yield undelayed net message if let Some(tup) = self.undelayed_messages.pop() { log!(@ENDPT, logger, "RECV undelayed_msg {:?}", &tup); @@ -156,7 +180,11 @@ impl EndpointManager { round_index: usize, ) -> Result { /////////////////////////////////////////// + // adds scoped functionality for EndpointManager impl EndpointManager { + // Given some Msg structure in a particular context, + // return a control message for the current round + // if its a payload message, buffer it instead fn handle_msg( &mut self, cu: &mut impl CuUndecided, @@ -167,10 +195,11 @@ impl EndpointManager { some_message_enqueued: &mut bool, ) -> Option<(usize, CommCtrlMsg)> { let comm_msg_contents = match msg { - Msg::SetupMsg(..) => return None, + Msg::SetupMsg(..) => return None, // discard setup messages Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(&round_index) { - Ordering::Equal => comm_msg.contents, + Ordering::Equal => comm_msg.contents, // ok, keep going Ordering::Less => { + // discard this message log!( cu.logger(), "We are in round {}, but msg is for round {}. Discard", @@ -180,6 +209,7 @@ impl EndpointManager { return None; } Ordering::Greater => { + // "delay" this message, enqueueing it for a future round log!( cu.logger(), "We are in round {}, but msg is for round {}. Buffer", @@ -191,9 +221,15 @@ impl EndpointManager { } }, }; + // inspect the contents of this contemporary message, sorting it match comm_msg_contents { - CommMsgContents::CommCtrl(comm_ctrl_msg) => Some((net_index, comm_ctrl_msg)), + CommMsgContents::CommCtrl(comm_ctrl_msg) => { + // yes! this is a CommCtrlMsg + Some((net_index, comm_ctrl_msg)) + } CommMsgContents::SendPayload(send_payload_msg) => { + // Enqueue this payload message + // Still not a CommCtrlMsg, so return None let getter = self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming; rctx.getter_push(getter, send_payload_msg); @@ -206,7 +242,7 @@ impl EndpointManager { use {PollAndPopulateError as Pape, UnrecoverableSyncError as Use}; /////////////////////////////////////////// let mut some_message_enqueued = false; - // try yield undelayed net message + // pop undelayed messages, handling them. Return the first CommCtrlMsg popped while let Some((net_index, msg)) = self.undelayed_messages.pop() { if let Some((net_index, msg)) = self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued) @@ -215,7 +251,8 @@ impl EndpointManager { } } loop { - // try receive a net message + // drain endpoints of incoming messages (without blocking) + // return first CommCtrlMsg received while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? { if let Some((net_index, msg)) = self.handle_msg( cu, @@ -237,7 +274,7 @@ impl EndpointManager { self.udp_endpoint_store.polled_undrained.insert(index); if !ee.received_this_round { let payload = Payload::from(&recv_buffer[..bytes_written]); - let port_spec_var = rctx.ips.spec_var_for(ee.getter_for_incoming); + let port_spec_var = rctx.ips.port_info.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); rctx.getter_push( ee.getter_for_incoming, @@ -261,6 +298,8 @@ impl EndpointManager { } } } + + // Try receive some message from any net endpoint without blocking fn try_recv_undrained_net( &mut self, logger: &mut dyn Logger, @@ -281,6 +320,9 @@ impl EndpointManager { } Ok(None) } + + // Poll the network, raising `polled_undrained` flags for endpoints + // as they receive events. fn poll_and_populate( &mut self, logger: &mut dyn Logger, @@ -297,9 +339,6 @@ impl EndpointManager { self.poll.poll(&mut self.events, remaining).map_err(|_| Pape::PollFailed)?; for event in self.events.iter() { match TokenTarget::from(event.token()) { - TokenTarget::Waker => { - // Can ignore. Residual event from endpoint manager setup procedure - } TokenTarget::NetEndpoint { index } => { self.net_endpoint_store.polled_undrained.insert(index); log!( @@ -327,6 +366,8 @@ impl EndpointManager { self.events.clear(); Ok(()) } + + // Move all delayed messages to undelayed, making it possible to yield them pub(super) fn undelay_all(&mut self) { if self.undelayed_messages.is_empty() { // fast path @@ -336,6 +377,8 @@ impl EndpointManager { // slow path self.undelayed_messages.extend(self.delayed_messages.drain(..)); } + + // End the synchronous round for the udp endpoints given the round decision pub(super) fn udp_endpoints_round_end( &mut self, logger: &mut dyn Logger, @@ -349,6 +392,11 @@ impl EndpointManager { ); use UnrecoverableSyncError as Use; if let Decision::Success(solution_predicate) = decision { + // Similar to a native component, we commit the branch of the component + // consistent with the predicate decided upon, making its effects visible + // to the world outside the connector's internals. + // In this case, this takes the form of emptying the component's outbox buffer, + // actually sending payloads 'on the wire' as UDP messages. for (index, ee) in self.udp_endpoint_store.endpoint_exts.iter_mut().enumerate() { 'outgoing_loop: for (payload_predicate, payload) in ee.outgoing_payloads.drain() { if payload_predicate.assigns_subset(solution_predicate) { diff --git a/src/runtime/logging.rs b/src/runtime/logging.rs index cdbe2b3611125add0c82d5ce7da8990742b8f0a3..7a97cb4fdc5ea40ca161798a889e2afb24d3c037 100644 --- a/src/runtime/logging.rs +++ b/src/runtime/logging.rs @@ -1,5 +1,6 @@ use super::*; +// Used in the loggers' format string fn secs_since_unix_epoch() -> f64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 3b67516160fab59fc5658dcd6e9501127e0e4005..8cc96f1cfbb5f3c577b2237f1d7d8c04a8bad4e7 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -147,7 +147,7 @@ enum SetupMsg { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] struct SessionInfo { serde_proto_description: SerdeProtocolDescription, - port_info: HashMap, + port_info: PortInfoMap, endpoint_incoming_to_getter: Vec, proto_components: HashMap, } @@ -313,12 +313,19 @@ struct MyPortInfo { owner: ComponentId, } +// Newtype around port info map, allowing the implementation of some +// useful methods +#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)] +struct PortInfoMap { + map: HashMap, +} + // A convenient substructure for containing port info and the ID manager. // Houses the bulk of the connector's persistent state between rounds. // It turns out several situations require access to both things. #[derive(Debug, Clone)] struct IdAndPortState { - port_info: HashMap, + port_info: PortInfoMap, id_manager: IdManager, } @@ -434,7 +441,6 @@ struct NativeBatch { enum TokenTarget { NetEndpoint { index: usize }, UdpEndpoint { index: usize }, - Waker, } // Returned by the endpoint manager as a result of comm_recv, telling the connector what happened, @@ -475,18 +481,15 @@ impl VecSet { self.vec.pop() } } -impl IdAndPortState { +impl PortInfoMap { fn ports_owned_by(&self, owner: ComponentId) -> impl Iterator { - self.port_info - .iter() - .filter(move |(_, port_info)| port_info.owner == owner) - .map(|(port, _)| port) + self.map.iter().filter(move |(_, port_info)| port_info.owner == owner).map(|(port, _)| port) } fn spec_var_for(&self, port: PortId) -> SpecVar { // Every port maps to a speculative variable // Two distinct ports map to the same variable // IFF they are two ends of the same logical channel. - let info = self.port_info.get(&port).unwrap(); + let info = self.map.get(&port).unwrap(); SpecVar(match info.polarity { Getter => port, Putter => info.peer.unwrap(), @@ -528,7 +531,7 @@ impl IdManager { } impl Drop for Connector { fn drop(&mut self) { - log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!"); + log!(self.unphased.logger(), "Connector dropping. Goodbye!"); } } // Given a slice of ports, return the first, if any, port is present repeatedly @@ -594,7 +597,7 @@ impl Connector { // - they are each others' peers // - they are owned by a local component with id `cid` // - polarity putter, getter respectively - cu.ips.port_info.insert( + cu.ips.port_info.map.insert( o, PortInfo { route: Route::LocalComponent, @@ -603,7 +606,7 @@ impl Connector { polarity: Putter, }, ); - cu.ips.port_info.insert( + cu.ips.port_info.map.insert( i, PortInfo { route: Route::LocalComponent, @@ -642,7 +645,7 @@ impl Connector { return Err(Ace::WrongNumberOfParamaters { expected: expected_polarities.len() }); } for (&expected_polarity, &port) in expected_polarities.iter().zip(ports.iter()) { - let info = cu.ips.port_info.get(&port).ok_or(Ace::UnknownPort(port))?; + let info = cu.ips.port_info.map.get(&port).ok_or(Ace::UnknownPort(port))?; if info.owner != cu.native_component_id { return Err(Ace::UnknownPort(port)); } @@ -658,7 +661,7 @@ impl Connector { .insert(new_cid, cu.proto_description.new_main_component(identifier, ports)); // update the ownership of moved ports for port in ports.iter() { - match cu.ips.port_info.get_mut(port) { + match cu.ips.port_info.map.get_mut(port) { Some(port_info) => port_info.owner = new_cid, None => unreachable!(), } @@ -787,7 +790,7 @@ impl RoundCtx { // buffer a message along with the ID of the putter who sent it fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) { - if let Some(getter) = self.ips.port_info.get(&putter).unwrap().peer { + if let Some(getter) = self.ips.port_info.map.get(&putter).unwrap().peer { log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter); self.getter_push(getter, msg); } else { diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 119c489e36c57a53a17c9571fcc6a7d809fd10bb..35561a07596524f63836ed3c36fe668052eeb457 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -1,16 +1,22 @@ use crate::common::*; use crate::runtime::*; +#[derive(Default)] +struct ExtraPortInfo { + info: HashMap, + peers: HashMap, +} + impl TokenTarget { + // subdivides the domain of usize into + // [NET_ENDPOINT][UDP_ENDPOINT ] + // ^0 ^usize::MAX/2 ^usize::MAX const HALFWAY_INDEX: usize = usize::MAX / 2; const MAX_INDEX: usize = usize::MAX; - const WAKER_TOKEN: usize = Self::MAX_INDEX; } impl From for TokenTarget { fn from(Token(index): Token) -> Self { - if index == Self::WAKER_TOKEN { - TokenTarget::Waker - } else if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { + if let Some(shifted) = index.checked_sub(Self::HALFWAY_INDEX) { TokenTarget::UdpEndpoint { index: shifted } } else { TokenTarget::NetEndpoint { index } @@ -20,7 +26,6 @@ impl From for TokenTarget { impl Into for TokenTarget { fn into(self) -> Token { match self { - TokenTarget::Waker => Token(Self::WAKER_TOKEN), TokenTarget::UdpEndpoint { index } => Token(index + Self::HALFWAY_INDEX), TokenTarget::NetEndpoint { index } => Token(index), } @@ -60,6 +65,7 @@ impl Connector { })), } } + /// Conceptually, this returning [p0, g1] is sugar for: /// 1. create port pair [p0, g0] /// 2. create port pair [p1, g1] @@ -76,19 +82,12 @@ impl Connector { ConnectorPhased::Setup(setup) => { let udp_index = setup.udp_endpoint_setups.len(); let udp_cid = cu.ips.id_manager.new_component_id(); + // allocates 4 new port identifiers, two for each logical channel, + // one channel per direction (into and out of the component) let mut npid = || cu.ips.id_manager.new_port_id(); let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()]; - - cu.ips.port_info.insert( - nin, - PortInfo { - route: Route::LocalComponent, - polarity: Getter, - peer: Some(uout), - owner: cu.native_component_id, - }, - ); - cu.ips.port_info.insert( + // allocate the native->udp_mediator channel's ports + cu.ips.port_info.map.insert( nout, PortInfo { route: Route::LocalComponent, @@ -97,7 +96,7 @@ impl Connector { owner: cu.native_component_id, }, ); - cu.ips.port_info.insert( + cu.ips.port_info.map.insert( uin, PortInfo { route: Route::UdpEndpoint { index: udp_index }, @@ -106,7 +105,8 @@ impl Connector { owner: udp_cid, }, ); - cu.ips.port_info.insert( + // allocate the udp_mediator->native channel's ports + cu.ips.port_info.map.insert( uout, PortInfo { route: Route::UdpEndpoint { index: udp_index }, @@ -115,11 +115,23 @@ impl Connector { owner: udp_cid, }, ); + cu.ips.port_info.map.insert( + nin, + PortInfo { + route: Route::LocalComponent, + polarity: Getter, + peer: Some(uout), + owner: cu.native_component_id, + }, + ); + // allocate the two ports owned by the UdpMediator component + // Remember to setup this UdpEndpoint setup during `connect` later. setup.udp_endpoint_setups.push(UdpEndpointSetup { local_addr, peer_addr, getter_for_incoming: nin, }); + // Return the native's output, input port pair Ok([nout, nin]) } } @@ -138,8 +150,9 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { + // allocate a single dangling port with a `None` peer (for now) let new_pid = cu.ips.id_manager.new_port_id(); - cu.ips.port_info.insert( + cu.ips.port_info.map.insert( new_pid, PortInfo { route: Route::LocalComponent, @@ -156,6 +169,7 @@ impl Connector { &sock_addr, endpoint_polarity ); + // Remember to setup this NetEndpoint setup during `connect` later. setup.net_endpoint_setups.push(NetEndpointSetup { sock_addr, endpoint_polarity, @@ -186,11 +200,11 @@ impl Connector { log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports - let mut endpoint_manager = new_endpoint_manager( + let (mut endpoint_manager, mut extra_port_info) = setup_endpoints_and_pair_ports( &mut *cu.logger, &setup.net_endpoint_setups, &setup.udp_endpoint_setups, - &mut cu.ips.port_info, + &cu.ips.port_info, &deadline, )?; log!( @@ -200,7 +214,8 @@ impl Connector { &cu.ips.port_info, &endpoint_manager, ); - // leader election and tree construction + // leader election and tree construction. Learn our role in the consensus tree, + // from learning who are our children/parents (neighbors) in the consensus tree. let neighborhood = init_neighborhood( cu.ips.id_manager.connector_id, &mut *cu.logger, @@ -208,80 +223,110 @@ impl Connector { &deadline, )?; log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); + // Put it all together with an initial round index of zero. let mut comm = ConnectorCommunication { round_index: 0, endpoint_manager, neighborhood, native_batches: vec![Default::default()], - round_result: Ok(None), + round_result: Ok(None), // no previous round yet }; if cfg!(feature = "session_optimization") { + // Perform the session optimization procedure, which may modify the + // internals of the connector, rerouting ports, moving around connectors etc. session_optimize(cu, &mut comm, &deadline)?; } log!(cu.logger, "connect() finished. setup phase complete"); + // Connect procedure successful! Commit changes by... + // ... commiting new port info for ConnectorUnphased + for (port, info) in extra_port_info.info.drain() { + cu.ips.port_info.map.insert(port, info); + } + for (port, peer) in extra_port_info.peers.drain() { + cu.ips.port_info.map.get_mut(&port).unwrap().peer = Some(peer); + } + // ... replacing the connector's phase to "communication" *phased = ConnectorPhased::Communication(Box::new(comm)); Ok(()) } } } } -fn new_endpoint_manager( + +// Given a set of net_ and udp_ endpoints to setup, +// port information to flesh out (by discovering peers through channels) +// and a deadline in which to do it, +// try to return: +// - An EndpointManager, containing all the set up endpoints +// - new information about ports acquired through the newly-created channels +fn setup_endpoints_and_pair_ports( logger: &mut dyn Logger, net_endpoint_setups: &[NetEndpointSetup], udp_endpoint_setups: &[UdpEndpointSetup], - port_info: &mut HashMap, + port_info: &PortInfoMap, deadline: &Option, -) -> Result { - //////////////////////////////////////////// - use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; +) -> Result<(EndpointManager, ExtraPortInfo), ConnectError> { use ConnectError as Ce; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); - const WAKER_PERIOD: Duration = Duration::from_millis(300); - struct WakerState { - continue_signal: AtomicBool, - waker: mio::Waker, - } - impl WakerState { - fn waker_loop(&self) { - while self.continue_signal.load(SeqCst) { - std::thread::sleep(WAKER_PERIOD); - let _ = self.waker.wake(); - } - } - fn waker_stop(&self) { - self.continue_signal.store(false, SeqCst); - // TODO keep waker registered? - } - } - struct Todo { + const RETRY_PERIOD: Duration = Duration::from_millis(200); + + // The structure shared between this ("setup") thread and that of the waker. + // The waker thread periodically sends signals. + // struct WakerState { + // continue_signal: AtomicBool, + // waker: mio::Waker, + // } + // impl WakerState { + // // The waker thread runs this UNTIL the continue signal is set to false + // fn waker_loop(&self) { + // while self.continue_signal.load(SeqCst) { + // std::thread::sleep(WAKER_PERIOD); + // let _ = self.waker.wake(); + // } + // } + // // The setup thread thread runs this to set the continue signal to false. + // fn waker_stop(&self) { + // self.continue_signal.store(false, SeqCst); + // } + // } + + // The data for a net endpoint's setup in progress + struct NetTodo { // becomes completed once sent_local_port && recv_peer_port.is_some() // we send local port if we haven't already and we receive a writable event // we recv peer port if we haven't already and we receive a readbale event - todo_endpoint: TodoEndpoint, + todo_endpoint: NetTodoEndpoint, endpoint_setup: NetEndpointSetup, sent_local_port: bool, // true <-> I've sent my local port recv_peer_port: Option, // Some(..) <-> I've received my peer's port } + + // The data for a udp endpoint's setup in progress struct UdpTodo { // becomes completed once we receive our first writable event getter_for_incoming: PortId, sock: UdpSocket, } - enum TodoEndpoint { - Accepting(TcpListener), - NetEndpoint(NetEndpoint), + + // Substructure of `NetTodo`, which represents the endpoint itself + enum NetTodoEndpoint { + Accepting(TcpListener), // awaiting it's peer initiating the connection + PeerInfoRecving(NetEndpoint), // awaiting info about peer port through the channel } + //////////////////////////////////////////// - // 1. Start to construct EndpointManager - let mut waker_state: Option> = None; + // Start to construct our return values + // let mut waker_state: Option> = None; + let mut extra_port_info = ExtraPortInfo::default(); let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity((net_endpoint_setups.len() + udp_endpoint_setups.len()) * 2 + 4); let [mut net_polled_undrained, udp_polled_undrained] = [VecSet::default(), VecSet::default()]; let mut delayed_messages = vec![]; + let mut last_retry_at = Instant::now(); - // 2. Create net/udp TODOs, each already registered with poll + // Create net/udp todo structures, each already registered with poll let mut net_todos = net_endpoint_setups .iter() .enumerate() @@ -292,21 +337,21 @@ fn new_endpoint_manager( let mut stream = TcpStream::connect(endpoint_setup.sock_addr) .expect("mio::TcpStream connect should not fail!"); poll.registry().register(&mut stream, token, BOTH).unwrap(); - TodoEndpoint::NetEndpoint(NetEndpoint { stream, inbox: vec![] }) + NetTodoEndpoint::PeerInfoRecving(NetEndpoint { stream, inbox: vec![] }) } else { let mut listener = TcpListener::bind(endpoint_setup.sock_addr) .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?; poll.registry().register(&mut listener, token, BOTH).unwrap(); - TodoEndpoint::Accepting(listener) + NetTodoEndpoint::Accepting(listener) }; - Ok(Todo { + Ok(NetTodo { todo_endpoint, sent_local_port: false, recv_peer_port: None, endpoint_setup: endpoint_setup.clone(), }) }) - .collect::, ConnectError>>()?; + .collect::, ConnectError>>()?; let udp_todos = udp_endpoint_setups .iter() .enumerate() @@ -322,8 +367,8 @@ fn new_endpoint_manager( }) .collect::, ConnectError>>()?; - // Initially, (1) no net connections have failed, and (2) all udp and net endpoint setups are incomplete - let mut net_connect_retry_later: HashSet = Default::default(); + // Initially no net connections have failed, and all udp and net endpoint setups are incomplete + let mut net_connect_to_retry: HashSet = Default::default(); let mut setup_incomplete: HashSet = { let net_todo_targets_iter = (0..net_todos.len()).map(|index| TokenTarget::NetEndpoint { index }); @@ -333,47 +378,49 @@ fn new_endpoint_manager( }; // progress by reacting to poll events. continue until every endpoint is set up while !setup_incomplete.is_empty() { + // recompute the time left to poll for progress let remaining = if let Some(deadline) = deadline { - Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?) + deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?.min(RETRY_PERIOD) } else { - None + RETRY_PERIOD }; - poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?; + // block until either + // (a) `events` has been populated with 1+ elements + // (b) timeout elapses, or + // (c) RETRY_PERIOD elapses + poll.poll(&mut events, Some(remaining)).map_err(|_| Ce::PollFailed)?; + if last_retry_at.elapsed() > RETRY_PERIOD { + // Retry all net connections and reset `last_retry_at` + last_retry_at = Instant::now(); + for net_index in net_connect_to_retry.drain() { + // Restart connect procedure for this net endpoint + let net_todo = &mut net_todos[net_index]; + log!( + logger, + "Restarting connection with endpoint {:?} {:?}", + net_index, + net_todo.endpoint_setup.sock_addr + ); + match &mut net_todo.todo_endpoint { + NetTodoEndpoint::PeerInfoRecving(endpoint) => { + let mut new_stream = TcpStream::connect(net_todo.endpoint_setup.sock_addr) + .expect("mio::TcpStream connect should not fail!"); + std::mem::swap(&mut endpoint.stream, &mut new_stream); + let token = TokenTarget::NetEndpoint { index: net_index }.into(); + poll.registry().register(&mut endpoint.stream, token, BOTH).unwrap(); + } + _ => unreachable!(), + } + } + } for event in events.iter() { let token = event.token(); + // figure out which endpoint the event belonged to let token_target = TokenTarget::from(token); match token_target { - TokenTarget::Waker => { - log!( - logger, - "Notification from waker. connect_failed is {:?}", - net_connect_retry_later.iter() - ); - assert!(waker_state.is_some()); - for net_index in net_connect_retry_later.drain() { - let net_todo = &mut net_todos[net_index]; - log!( - logger, - "Restarting connection with endpoint {:?} {:?}", - net_index, - net_todo.endpoint_setup.sock_addr - ); - match &mut net_todo.todo_endpoint { - TodoEndpoint::NetEndpoint(endpoint) => { - let mut new_stream = - TcpStream::connect(net_todo.endpoint_setup.sock_addr) - .expect("mio::TcpStream connect should not fail!"); - std::mem::swap(&mut endpoint.stream, &mut new_stream); - let token = TokenTarget::NetEndpoint { index: net_index }.into(); - poll.registry() - .register(&mut endpoint.stream, token, BOTH) - .unwrap(); - } - _ => unreachable!(), - } - } - } TokenTarget::UdpEndpoint { index } => { + // UdpEndpoints are easy to complete. + // Their setup event just has to succeed without error if !setup_incomplete.contains(&token_target) { // spurious wakeup. this endpoint has already been set up! continue; @@ -385,9 +432,12 @@ fn new_endpoint_manager( setup_incomplete.remove(&token_target); } TokenTarget::NetEndpoint { index } => { + // NetEndpoints are complex to complete, + // they must accept/connect to their peer, + // and then exchange port info successfully let net_todo = &mut net_todos[index]; - if let TodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint { - // FIRST try complete this connection + if let NetTodoEndpoint::Accepting(listener) = &mut net_todo.todo_endpoint { + // Passive endpoint that will first try accept the peer's connection match listener.accept() { Err(e) if err_would_block(&e) => continue, // spurious wakeup Err(_) => { @@ -406,51 +456,32 @@ fn new_endpoint_manager( peer_addr ); let net_endpoint = NetEndpoint { stream, inbox: vec![] }; - net_todo.todo_endpoint = TodoEndpoint::NetEndpoint(net_endpoint); + net_todo.todo_endpoint = + NetTodoEndpoint::PeerInfoRecving(net_endpoint); } } } - if let TodoEndpoint::NetEndpoint(net_endpoint) = &mut net_todo.todo_endpoint { + // OK now let's try and finish exchanging port info + if let NetTodoEndpoint::PeerInfoRecving(net_endpoint) = + &mut net_todo.todo_endpoint + { if event.is_error() { + // event signals some error! :( if net_todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive { - // right now you cannot retry an acceptor. return failure + // breaking as the acceptor is currently unrecoverable return Err(Ce::AcceptFailed( net_endpoint.stream.local_addr().unwrap(), )); } // this actively-connecting endpoint failed to connect! - if net_connect_retry_later.insert(index) { - log!( - logger, - "Connection failed for {:?}. List is {:?}", - index, - net_connect_retry_later.iter() - ); - poll.registry().deregister(&mut net_endpoint.stream).unwrap(); - } else { - // spurious wakeup. already scheduled to retry connect later - continue; - } - if waker_state.is_none() { - log!(logger, "First connect failure. Starting waker thread"); - let arc = Arc::new(WakerState { - waker: mio::Waker::new( - poll.registry(), - TokenTarget::Waker.into(), - ) - .unwrap(), - continue_signal: true.into(), - }); - let moved_arc = arc.clone(); - waker_state = Some(arc); - std::thread::spawn(move || moved_arc.waker_loop()); - } + // We will schedule it for a retry + net_connect_to_retry.insert(index); continue; } // event wasn't ERROR - if net_connect_retry_later.contains(&index) { + if net_connect_to_retry.contains(&index) { // spurious wakeup. already scheduled to retry connect later continue; } @@ -462,8 +493,9 @@ fn new_endpoint_manager( continue; } let local_info = port_info - .get_mut(&net_todo.endpoint_setup.getter_for_incoming) - .unwrap(); + .map + .get(&net_todo.endpoint_setup.getter_for_incoming) + .expect("Net Setup's getter port info isn't known"); // unreachable if event.is_writable() && !net_todo.sent_local_port { // can write and didn't send setup msg yet? Do so! let msg = Msg::SetupMsg(SetupMsg::MyPortInfo(MyPortInfo { @@ -484,7 +516,7 @@ fn new_endpoint_manager( net_todo.sent_local_port = true; } if event.is_readable() && net_todo.recv_peer_port.is_none() { - // can read and didn't recv setup msg yet? Do so! + // can read and didn't finish recving setup msg yet? Do so! let maybe_msg = net_endpoint.try_recv(logger).map_err(|e| { Ce::NetEndpointSetupError( net_endpoint.stream.local_addr().unwrap(), @@ -509,15 +541,21 @@ fn new_endpoint_manager( )); } net_todo.recv_peer_port = Some(peer_info.port); - // 1. finally learned the peer of this port! - local_info.peer = Some(peer_info.port); - // 2. learned the info of this peer port - port_info.entry(peer_info.port).or_insert(PortInfo { - peer: Some(net_todo.endpoint_setup.getter_for_incoming), - polarity: peer_info.polarity, - owner: peer_info.owner, - route: Route::NetEndpoint { index }, - }); + // finally learned the peer of this port! + extra_port_info.peers.insert( + net_todo.endpoint_setup.getter_for_incoming, + peer_info.port, + ); + // learned the info of this peer port + if !port_info.map.contains_key(&peer_info.port) { + let info = PortInfo { + peer: Some(net_todo.endpoint_setup.getter_for_incoming), + polarity: peer_info.polarity, + owner: peer_info.owner, + route: Route::NetEndpoint { index }, + }; + extra_port_info.info.insert(peer_info.port, info); + } } Some(inappropriate_msg) => { log!( @@ -542,15 +580,12 @@ fn new_endpoint_manager( events.clear(); } log!(logger, "Endpoint setup complete! Cleaning up and building structures"); - if let Some(ws) = waker_state.take() { - ws.waker_stop(); - } let net_endpoint_exts = net_todos .into_iter() .enumerate() - .map(|(index, Todo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt { + .map(|(index, NetTodo { todo_endpoint, endpoint_setup, .. })| NetEndpointExt { net_endpoint: match todo_endpoint { - TodoEndpoint::NetEndpoint(mut net_endpoint) => { + NetTodoEndpoint::PeerInfoRecving(mut net_endpoint) => { let token = TokenTarget::NetEndpoint { index }.into(); poll.registry() .reregister(&mut net_endpoint.stream, token, Interest::READABLE) @@ -577,7 +612,7 @@ fn new_endpoint_manager( } }) .collect(); - Ok(EndpointManager { + let endpoint_manager = EndpointManager { poll, events, undelayed_messages: delayed_messages, // no longer delayed @@ -591,22 +626,33 @@ fn new_endpoint_manager( polled_undrained: udp_polled_undrained, }, udp_in_buffer: Default::default(), - }) + }; + Ok((endpoint_manager, extra_port_info)) } +// Given a fully-formed endpoint manager, +// construct the consensus tree with: +// 1. decentralized leader election +// 2. centralized tree construction fn init_neighborhood( connector_id: ConnectorId, logger: &mut dyn Logger, em: &mut EndpointManager, deadline: &Option, ) -> Result { - //////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; + + // storage structure for the state of a distributed wave + // (for readability) #[derive(Debug)] struct WaveState { parent: Option, leader: ConnectorId, } + + // kick off a leader-election wave rooted at myself + // given the desired wave information + // (e.g. don't inform my parent if they exist) fn do_wave( em: &mut EndpointManager, awaiting: &mut HashSet, @@ -661,6 +707,8 @@ fn init_neighborhood( log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::LeaderAnnounce { tree_leader }) => { + // A neighbor explicitly tells me who is the leader + // they become my parent, and I adopt their announced leader let election_result = WaveState { leader: tree_leader, parent: Some(recv_index) }; log!(logger, "Election lost! Result {:?}", &election_result); @@ -734,6 +782,8 @@ fn init_neighborhood( }; // starting algorithm 2. Send a message to every neighbor + // namely, send "YouAreMyParent" to parent (if they exist), + // and LeaderAnnounce to everyone else log!(logger, "Starting tree construction. Step 1: send one msg per neighbor"); awaiting.clear(); for index in em.index_iter() { @@ -747,6 +797,8 @@ fn init_neighborhood( )?; } } + // Receive one message from each neighbor to learn + // whether they consider me their parent or not. let mut children = vec![]; em.undelay_all(); while !awaiting.is_empty() { @@ -755,7 +807,7 @@ fn init_neighborhood( log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::LeaderAnnounce { .. }) => { - // not a child + // `recv_index` is not my child log!( logger, "Got reply from non-child index {:?}. Children: {:?}", @@ -776,6 +828,7 @@ fn init_neighborhood( ); return Err(Ce::SetupAlgMisbehavior); } + // `recv_index` is my child children.push(recv_index); } msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => { @@ -789,6 +842,7 @@ fn init_neighborhood( } } } + // Neighborhood complete! children.shrink_to_fit(); let neighborhood = Neighborhood { parent: election_result.parent, children: VecSet::new(children) }; @@ -796,14 +850,17 @@ fn init_neighborhood( Ok(neighborhood) } +// Connectors collect a map of type ConnectorId=>SessionInfo, +// representing a global view of the session's state at the leader. +// The leader rewrites its contents however they like (currently: nothing happens) +// and the map is again broadcasted, for each peer to make their local changes to +// reflect the results of the rewrite. fn session_optimize( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, deadline: &Option, ) -> Result<(), ConnectError> { - //////////////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; - //////////////////////////////////////// log!(cu.logger, "Beginning session optimization"); // populate session_info_map from a message per child let mut unoptimized_map: HashMap = Default::default(); @@ -857,6 +914,7 @@ fn session_optimize( "Gathered all children's maps. ConnectorId set is... {:?}", unoptimized_map.keys() ); + // add my own session info to the map let my_session_info = SessionInfo { port_info: cu.ips.port_info.clone(), proto_components: cu.proto_components.clone(), @@ -871,7 +929,6 @@ fn session_optimize( }; unoptimized_map.insert(cu.ips.id_manager.connector_id, my_session_info); log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map); - // acquire the optimized info... let optimized_map = if let Some(parent) = comm.neighborhood.parent { // ... as a message from my parent @@ -920,25 +977,35 @@ fn session_optimize( comm.neighborhood.children.iter() ); log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); + // extract my own ConnectorId's entry let optimized_info = optimized_map.get(&cu.ips.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); + // broadcast the optimized session info to my children let msg = S(Sm::SessionScatter { optimized_map }); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_setup(child, &msg)?; } - apply_optimizations(cu, comm, optimized_info)?; + // apply local optimizations + apply_my_optimizations(cu, comm, optimized_info)?; log!(cu.logger, "Session optimizations applied"); Ok(()) } + +// Defines the optimization function, consuming an optimized map, +// and returning an optimized map. fn leader_session_map_optimize( logger: &mut dyn Logger, unoptimized_map: HashMap, ) -> Result, ConnectError> { log!(logger, "Session map optimize START"); + // currently, it's the identity function log!(logger, "Session map optimize END"); Ok(unoptimized_map) } -fn apply_optimizations( + +// Modify the given connector's internals to reflect +// the given session info +fn apply_my_optimizations( cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, session_info: SessionInfo, @@ -949,7 +1016,7 @@ fn apply_optimizations( serde_proto_description, endpoint_incoming_to_getter, } = session_info; - // TODO some info which should be read-only can be mutated with the current scheme + // simply overwrite the contents cu.ips.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0;