diff --git a/.gitignore b/.gitignore index 22e1592296eabd6ea5b831a55b0adf1a86b086cb..fbbfbb28ff59e74d951ba8f7f4d80349eb106f65 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ examples/*/*.exe examples/*.dll examples/reowolf* examples/*.txt -logs +logs/* +logs/*/* diff --git a/Cargo.toml b/Cargo.toml index e629105de36dcf87b4bca2d7812891f476d239a8..7787c0f3a2a3d7b150d7156f5e66ab11b1a4ea34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,4 +36,5 @@ crate-type = ["cdylib"] [features] default = ["ffi"] -ffi = [] # no feature dependencies \ No newline at end of file +ffi = [] # no feature dependencies +endpoint_logging = [] # see src/macros where a conditional check include endpoint logging \ No newline at end of file diff --git a/src/common.rs b/src/common.rs index 21af2700159a7b95ee62aa762d24275b6aadf333..2c816e862dd1bf006eccbb36e16c9d34dfdb78c7 100644 --- a/src/common.rs +++ b/src/common.rs @@ -87,6 +87,7 @@ pub(crate) enum SyncBlocker { CouldntCheckFiring(PortId), PutMsg(PortId, Payload), } +pub(crate) struct DenseDebugHex<'a>(pub &'a [u8]); ///////////////////// IMPL ///////////////////// impl U32Stream { @@ -176,7 +177,7 @@ impl Debug for ProtoComponentId { } impl Debug for Payload { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "Payload{:x?}", self.as_slice()) + write!(f, "Payload[{:?}]", DenseDebugHex(self.as_slice())) } } impl std::ops::Not for Polarity { @@ -189,3 +190,11 @@ impl std::ops::Not for Polarity { } } } +impl Debug for DenseDebugHex<'_> { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + for b in self.0 { + write!(f, "{:02X?}", b)?; + } + Ok(()) + } +} diff --git a/src/macros.rs b/src/macros.rs index 0ead5431e76a9824f31e9ef7dad773a40f431b98..92d83ade6ecb93a88f8995bdbc60df3af0a9415d 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1,7 +1,9 @@ macro_rules! endptlog { ($logger:expr, $($arg:tt)*) => {{ - // let w = $logger.line_writer(); - // let _ = writeln!(w, $($arg)*); + if cfg!(feature = "endpoint_logging") { + let w = $logger.line_writer(); + let _ = writeln!(w, $($arg)*); + } }}; } macro_rules! log { diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 437a9aec36d61e1676ca79bb93477511582aabf3..da44b68e899e8ced9bd4c9888d6bb8cb2411a12a 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -60,23 +60,23 @@ impl ReplaceBoolTrue for bool { //////////////// impl Connector { pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { - use GottenError::*; + use GottenError as Ge; let Self { phased, .. } = self; match phased { - ConnectorPhased::Setup { .. } => Err(NoPreviousRound), + ConnectorPhased::Setup { .. } => Err(Ge::NoPreviousRound), ConnectorPhased::Communication(comm) => match &comm.round_result { - Err(_) => Err(PreviousSyncFailed), - Ok(None) => Err(NoPreviousRound), - Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(PortDidntGet), + Err(_) => Err(Ge::PreviousSyncFailed), + Ok(None) => Err(Ge::NoPreviousRound), + Ok(Some(round_ok)) => round_ok.gotten.get(&port).ok_or(Ge::PortDidntGet), }, } } pub fn next_batch(&mut self) -> Result { // returns index of new batch - use NextBatchError::*; + use NextBatchError as Nbe; let Self { phased, .. } = self; match phased { - ConnectorPhased::Setup { .. } => Err(NotConnected), + ConnectorPhased::Setup { .. } => Err(Nbe::NotConnected), ConnectorPhased::Communication(comm) => { comm.native_batches.push(Default::default()); Ok(comm.native_batches.len() - 1) @@ -88,18 +88,18 @@ impl Connector { port: PortId, expect_polarity: Polarity, ) -> Result<&mut NativeBatch, PortOpError> { - use PortOpError::*; + use PortOpError as Poe; let Self { unphased, phased } = self; if !unphased.native_ports.contains(&port) { - return Err(PortUnavailable); + return Err(Poe::PortUnavailable); } match unphased.port_info.polarities.get(&port) { Some(p) if *p == expect_polarity => {} - Some(_) => return Err(WrongPolarity), - None => return Err(UnknownPolarity), + Some(_) => return Err(Poe::WrongPolarity), + None => return Err(Poe::UnknownPolarity), } match phased { - ConnectorPhased::Setup { .. } => Err(NotConnected), + ConnectorPhased::Setup { .. } => Err(Poe::NotConnected), ConnectorPhased::Communication(comm) => { let batch = comm.native_batches.last_mut().unwrap(); // length >= 1 is invariant Ok(batch) @@ -107,22 +107,22 @@ impl Connector { } } pub fn put(&mut self, port: PortId, payload: Payload) -> Result<(), PortOpError> { - use PortOpError::*; + use PortOpError as Poe; let batch = self.port_op_access(port, Putter)?; if batch.to_put.contains_key(&port) { - Err(MultipleOpsOnPort) + Err(Poe::MultipleOpsOnPort) } else { batch.to_put.insert(port, payload); Ok(()) } } pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { - use PortOpError::*; + use PortOpError as Poe; let batch = self.port_op_access(port, Getter)?; if batch.to_get.insert(port) { Ok(()) } else { - Err(MultipleOpsOnPort) + Err(Poe::MultipleOpsOnPort) } } // entrypoint for caller. overwrites round result enum, and returns what happened @@ -423,7 +423,6 @@ impl Connector { Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg( cu, &mut solution_storage, - // &mut Pay getter, &send_payload_msg, ), @@ -432,7 +431,6 @@ impl Connector { branching_proto_components.get_mut(proto_component_id) { let proto_component_id = *proto_component_id; - // let ConnectorUnphased { port_info, proto_description, .. } = cu; branching_component.feed_msg( cu, &mut solution_storage, diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 9ecab8f8f7dccca0cc1263d248e2113af58aff08..3118126f12f8118c6e8219beda332e544c883cc8 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -10,7 +10,6 @@ enum TryRecyAnyError { PollFailed, EndpointError { error: EndpointError, index: usize }, } - ///////////////////// impl Endpoint { fn bincode_opts() -> impl bincode::config::Options { @@ -20,40 +19,52 @@ impl Endpoint { &mut self, logger: &mut dyn Logger, ) -> Result, EndpointError> { - use EndpointError::*; + use EndpointError as Ee; // populate inbox as much as possible + let before_len = self.inbox.len(); 'read_loop: loop { let res = self.stream.read_to_end(&mut self.inbox); - endptlog!(logger, "Stream read to end {:?}", &res); match res { Err(e) if would_block(&e) => break 'read_loop, Ok(0) => break 'read_loop, Ok(_) => (), - Err(_e) => return Err(BrokenEndpoint), + Err(_e) => return Err(Ee::BrokenEndpoint), } } - endptlog!(logger, "Inbox bytes {:x?}", &self.inbox); + endptlog!( + logger, + "Inbox bytes [{:x?}| {:x?}]", + DenseDebugHex(&self.inbox[..before_len]), + DenseDebugHex(&self.inbox[before_len..]), + ); let mut monitored = MonitoredReader::from(&self.inbox[..]); use bincode::config::Options; match Self::bincode_opts().deserialize_from(&mut monitored) { Ok(msg) => { let msg_size = monitored.bytes_read(); self.inbox.drain(0..(msg_size.try_into().unwrap())); + endptlog!( + logger, + "Yielding msg. Inbox len {}-{}=={}: [{:?}]", + self.inbox.len() + msg_size, + msg_size, + self.inbox.len(), + DenseDebugHex(&self.inbox[..]), + ); Ok(Some(msg)) } Err(e) => match *e { bincode::ErrorKind::Io(k) if k.kind() == std::io::ErrorKind::UnexpectedEof => { Ok(None) } - _ => Err(MalformedMessage), + _ => Err(Ee::MalformedMessage), }, } } pub(super) fn send(&mut self, msg: &T) -> Result<(), EndpointError> { use bincode::config::Options; - Self::bincode_opts() - .serialize_into(&mut self.stream, msg) - .map_err(|_| EndpointError::BrokenEndpoint) + use EndpointError as Ee; + Self::bincode_opts().serialize_into(&mut self.stream, msg).map_err(|_| Ee::BrokenEndpoint) } } @@ -74,9 +85,6 @@ impl EndpointManager { ConnectError::EndpointSetupError(endpoint.stream.local_addr().unwrap(), err) }) } - pub(super) fn send_to(&mut self, index: usize, msg: &Msg) -> Result<(), EndpointError> { - self.endpoint_exts[index].endpoint.send(msg) - } pub(super) fn try_recv_any_comms( &mut self, logger: &mut dyn Logger, @@ -185,7 +193,6 @@ impl Read for MonitoredReader { Ok(n) } } - impl Into for SetupMsg { fn into(self) -> Msg { Msg::SetupMsg(self) diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index a3a34022024d40b6bab9efb8005f64af102c02f3..ea26830c70aa06f220aee2c86796a27c151ff5b1 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -93,7 +93,7 @@ enum SetupMsg { struct SessionInfo { serde_proto_description: SerdeProtocolDescription, port_info: PortInfo, - getter_for_incoming: Vec, + endpoint_incoming_to_getter: Vec, proto_components: HashMap, } #[derive(Debug, Clone)] @@ -304,19 +304,19 @@ impl Connector { ports: &[PortId], ) -> Result<(), AddComponentError> { // called by the USER. moves ports owned by the NATIVE - use AddComponentError::*; + use AddComponentError as Ace; // 1. check if this is OK let cu = &mut self.unphased; let polarities = cu.proto_description.component_polarities(identifier)?; if polarities.len() != ports.len() { - return Err(WrongNumberOfParamaters { expected: polarities.len() }); + return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() }); } for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { if !cu.native_ports.contains(port) { - return Err(UnknownPort(*port)); + return Err(Ace::UnknownPort(*port)); } if expected_polarity != *cu.port_info.polarities.get(port).unwrap() { - return Err(WrongPortPolarity { port: *port, expected_polarity }); + return Err(Ace::WrongPortPolarity { port: *port, expected_polarity }); } } // 3. remove ports from old component & update port->route diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index ce8aa92e5852bc1acee49cd198dde65996f59e88..ab12b429d02be2e6aa815d43612319c5744d0efe 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -50,12 +50,12 @@ impl Connector { } } pub fn connect(&mut self, timeout: Option) -> Result<(), ConnectError> { - use ConnectError::*; + use ConnectError as Ce; let Self { unphased: cu, phased } = self; match phased { ConnectorPhased::Communication { .. } => { log!(cu.logger, "Call to connecting in connected state"); - Err(AlreadyConnected) + Err(Ce::AlreadyConnected) } ConnectorPhased::Setup { endpoint_setups, .. } => { log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); @@ -103,7 +103,7 @@ fn new_endpoint_manager( ) -> Result { //////////////////////////////////////////// use std::sync::atomic::AtomicBool; - use ConnectError::*; + use ConnectError as Ce; const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); struct Todo { todo_endpoint: TodoEndpoint, @@ -129,7 +129,7 @@ fn new_endpoint_manager( TodoEndpoint::Endpoint(Endpoint { stream, inbox: vec![] }) } else { let mut listener = TcpListener::bind(endpoint_setup.sock_addr) - .map_err(|_| BindFailed(endpoint_setup.sock_addr))?; + .map_err(|_| Ce::BindFailed(endpoint_setup.sock_addr))?; poll.registry().register(&mut listener, token, BOTH).unwrap(); TodoEndpoint::Accepting(listener) }; @@ -150,7 +150,7 @@ fn new_endpoint_manager( assert!(endpoint_setups.len() < WAKER_TOKEN.0); // using MAX usize as waker token let mut waker_continue_signal: Option> = None; - let mut poll = Poll::new().map_err(|_| PollInitFailed)?; + let mut poll = Poll::new().map_err(|_| Ce::PollInitFailed)?; let mut events = Events::with_capacity(endpoint_setups.len() * 2 + 4); let mut polled_undrained = VecSet::default(); let mut delayed_messages = vec![]; @@ -175,11 +175,11 @@ fn new_endpoint_manager( let mut setup_incomplete: HashSet = (0..todos.len()).collect(); while !setup_incomplete.is_empty() { let remaining = if let Some(deadline) = deadline { - Some(deadline.checked_duration_since(Instant::now()).ok_or(Timeout)?) + Some(deadline.checked_duration_since(Instant::now()).ok_or(Ce::Timeout)?) } else { None }; - poll.poll(&mut events, remaining).map_err(|_| PollFailed)?; + poll.poll(&mut events, remaining).map_err(|_| Ce::PollFailed)?; for event in events.iter() { let token = event.token(); let Token(index) = token; @@ -232,7 +232,7 @@ fn new_endpoint_manager( } Err(_) => { log!(logger, "accept() failure on index {}", index); - return Err(AcceptFailed(listener.local_addr().unwrap())); + return Err(Ce::AcceptFailed(listener.local_addr().unwrap())); } } } @@ -240,7 +240,7 @@ fn new_endpoint_manager( if event.is_error() { if todo.endpoint_setup.endpoint_polarity == EndpointPolarity::Passive { // right now you cannot retry an acceptor. - return Err(AcceptFailed(endpoint.stream.local_addr().unwrap())); + return Err(Ce::AcceptFailed(endpoint.stream.local_addr().unwrap())); } if connect_failed.insert(index) { log!( @@ -288,7 +288,7 @@ fn new_endpoint_manager( endpoint .send(&msg) .map_err(|e| { - EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) + Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) }) .unwrap(); log!(logger, "endpoint[{}] sent msg {:?}", index, &msg); @@ -296,7 +296,7 @@ fn new_endpoint_manager( } if event.is_readable() && todo.recv_peer_port.is_none() { let maybe_msg = endpoint.try_recv(logger).map_err(|e| { - EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) + Ce::EndpointSetupError(endpoint.stream.local_addr().unwrap(), e) })?; if maybe_msg.is_some() && !endpoint.inbox.is_empty() { polled_undrained.insert(index); @@ -386,7 +386,7 @@ fn init_neighborhood( deadline: Option, ) -> Result { //////////////////////////////// - use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*}; + use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; #[derive(Debug)] struct WaveState { parent: Option, @@ -398,7 +398,7 @@ fn init_neighborhood( ws: &WaveState, ) -> Result<(), ConnectError> { awaiting.clear(); - let msg = S(LeaderWave { wave_leader: ws.leader }); + let msg = S(Sm::LeaderWave { wave_leader: ws.leader }); for index in em.index_iter() { if Some(index) != ws.parent { em.send_to_setup(index, &msg)?; @@ -445,7 +445,7 @@ fn init_neighborhood( let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(LeaderAnnounce { tree_leader }) => { + S(Sm::LeaderAnnounce { tree_leader }) => { let election_result = WaveState { leader: tree_leader, parent: Some(recv_index) }; log!(logger, "Election lost! Result {:?}", &election_result); @@ -453,7 +453,7 @@ fn init_neighborhood( assert_ne!(election_result.leader, connector_id); break 'election election_result; } - S(LeaderWave { wave_leader }) => { + S(Sm::LeaderWave { wave_leader }) => { use Ordering as O; match wave_leader.cmp(&best_wave.leader) { O::Less => log!( @@ -504,12 +504,12 @@ fn init_neighborhood( } } } - msg @ S(YouAreMyParent) | msg @ S(MyPortInfo(_)) => { + msg @ S(Sm::YouAreMyParent) | msg @ S(Sm::MyPortInfo(_)) => { log!(logger, "Endpont {:?} sent unexpected msg! {:?}", recv_index, &msg); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } - msg @ S(SessionScatter { .. }) - | msg @ S(SessionGather { .. }) + msg @ S(Sm::SessionScatter { .. }) + | msg @ S(Sm::SessionGather { .. }) | msg @ Msg::CommMsg { .. } => { log!(logger, "delaying msg {:?} during election algorithm", msg); em.delayed_messages.push((recv_index, msg)); @@ -523,10 +523,13 @@ fn init_neighborhood( awaiting.clear(); for index in em.index_iter() { if Some(index) == election_result.parent { - em.send_to_setup(index, &S(YouAreMyParent))?; + em.send_to_setup(index, &S(Sm::YouAreMyParent))?; } else { awaiting.insert(index); - em.send_to_setup(index, &S(LeaderAnnounce { tree_leader: election_result.leader }))?; + em.send_to_setup( + index, + &S(Sm::LeaderAnnounce { tree_leader: election_result.leader }), + )?; } } let mut children = vec![]; @@ -536,7 +539,7 @@ fn init_neighborhood( let (recv_index, msg) = em.try_recv_any_setup(logger, deadline)?; log!(logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(LeaderAnnounce { .. }) => { + S(Sm::LeaderAnnounce { .. }) => { // not a child log!( logger, @@ -545,10 +548,10 @@ fn init_neighborhood( children.iter() ); if !awaiting.remove(&recv_index) { - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } } - S(YouAreMyParent) => { + S(Sm::YouAreMyParent) => { if !awaiting.remove(&recv_index) { log!( logger, @@ -556,15 +559,15 @@ fn init_neighborhood( recv_index, children.iter() ); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } children.push(recv_index); } - msg @ S(MyPortInfo(_)) | msg @ S(LeaderWave { .. }) => { + msg @ S(Sm::MyPortInfo(_)) | msg @ S(Sm::LeaderWave { .. }) => { log!(logger, "discarding old message {:?} during election", msg); } - msg @ S(SessionScatter { .. }) - | msg @ S(SessionGather { .. }) + msg @ S(Sm::SessionScatter { .. }) + | msg @ S(Sm::SessionGather { .. }) | msg @ Msg::CommMsg { .. } => { log!(logger, "delaying msg {:?} during election", msg); em.delayed_messages.push((recv_index, msg)); @@ -584,7 +587,7 @@ fn session_optimize( deadline: Option, ) -> Result<(), ConnectError> { //////////////////////////////////////// - use {ConnectError::*, Msg::SetupMsg as S, SetupMsg::*}; + use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; //////////////////////////////////////// log!(cu.logger, "Beginning session optimization"); // populate session_info_map from a message per child @@ -601,7 +604,7 @@ fn session_optimize( comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(SessionGather { unoptimized_map: child_unoptimized_map }) => { + S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => { if !awaiting.remove(&recv_index) { log!( cu.logger, @@ -609,24 +612,24 @@ fn session_optimize( recv_index, &child_unoptimized_map ); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } unoptimized_map.extend(child_unoptimized_map.into_iter()); } - msg @ S(YouAreMyParent) - | msg @ S(MyPortInfo(..)) - | msg @ S(LeaderAnnounce { .. }) - | msg @ S(LeaderWave { .. }) => { + msg @ S(Sm::YouAreMyParent) + | msg @ S(Sm::MyPortInfo(..)) + | msg @ S(Sm::LeaderAnnounce { .. }) + | msg @ S(Sm::LeaderWave { .. }) => { log!(cu.logger, "discarding old message {:?} during election", msg); } - msg @ S(SessionScatter { .. }) => { + msg @ S(Sm::SessionScatter { .. }) => { log!( cu.logger, "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!", recv_index, &msg ); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } msg @ Msg::CommMsg(..) => { log!(cu.logger, "delaying msg {:?} during session optimization", msg); @@ -643,7 +646,7 @@ fn session_optimize( port_info: cu.port_info.clone(), proto_components: cu.proto_components.clone(), serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), - getter_for_incoming: comm + endpoint_incoming_to_getter: comm .endpoint_manager .endpoint_exts .iter() @@ -657,7 +660,7 @@ fn session_optimize( let optimized_map = if let Some(parent) = comm.neighborhood.parent { // ... as a message from my parent log!(cu.logger, "Forwarding gathered info to parent {:?}", parent); - let msg = S(SessionGather { unoptimized_map }); + let msg = S(Sm::SessionGather { unoptimized_map }); comm.endpoint_manager.send_to_setup(parent, &msg)?; 'scatter_loop: loop { log!( @@ -669,10 +672,10 @@ fn session_optimize( comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { - S(SessionScatter { optimized_map }) => { + S(Sm::SessionScatter { optimized_map }) => { if recv_index != parent { log!(cu.logger, "I expected the scatter from my parent only!"); - return Err(SetupAlgMisbehavior); + return Err(Ce::SetupAlgMisbehavior); } break 'scatter_loop optimized_map; } @@ -680,11 +683,11 @@ fn session_optimize( log!(cu.logger, "delaying msg {:?} during scatter recv", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } - msg @ S(SessionGather { .. }) - | msg @ S(YouAreMyParent) - | msg @ S(MyPortInfo(..)) - | msg @ S(LeaderAnnounce { .. }) - | msg @ S(LeaderWave { .. }) => { + msg @ S(Sm::SessionGather { .. }) + | msg @ S(Sm::YouAreMyParent) + | msg @ S(Sm::MyPortInfo(..)) + | msg @ S(Sm::LeaderAnnounce { .. }) + | msg @ S(Sm::LeaderWave { .. }) => { log!(cu.logger, "discarding old message {:?} during election", msg); } } @@ -703,7 +706,7 @@ fn session_optimize( log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); let optimized_info = optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); - let msg = S(SessionScatter { optimized_map }); + let msg = S(Sm::SessionScatter { optimized_map }); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_setup(child, &msg)?; } @@ -724,12 +727,19 @@ fn apply_optimizations( comm: &mut ConnectorCommunication, session_info: SessionInfo, ) -> Result<(), ConnectError> { - let SessionInfo { proto_components, port_info, serde_proto_description, getter_for_incoming } = - session_info; + let SessionInfo { + proto_components, + port_info, + serde_proto_description, + endpoint_incoming_to_getter, + } = session_info; + // TODO some info which should be read-only can be mutated with the current scheme cu.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; - for (ee, getter) in comm.endpoint_manager.endpoint_exts.iter_mut().zip(getter_for_incoming) { + for (ee, getter) in + comm.endpoint_manager.endpoint_exts.iter_mut().zip(endpoint_incoming_to_getter) + { ee.getter_for_incoming = getter; } Ok(())