diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 266e9c33be61cf7606ee988fbbd048d77e02a07d..921d409c7b68d2e5b83d5bae2c3a4b2675a06a45 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -5,6 +5,7 @@ use crate::common::*; struct BranchingNative { branches: HashMap, } +#[derive(Clone, Debug)] struct NativeBranch { index: usize, gotten: HashMap, @@ -77,7 +78,8 @@ impl NonsyncProtoContext<'_> { } impl SyncProtoContext<'_> { pub fn is_firing(&mut self, port: PortId) -> Option { - self.predicate.query(port) + let var = self.port_info.firing_var_for(port); + self.predicate.query(var) } pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { self.inbox.get(&port) @@ -116,6 +118,36 @@ impl Connector { } } } + pub fn next_batch(&mut self) -> Result { + // returns index of new batch + use NextBatchError::*; + match &mut self.phased { + ConnectorPhased::Setup { .. } => Err(NotConnected), + ConnectorPhased::Communication { native_batches, .. } => { + native_batches.push(Default::default()); + Ok(native_batches.len() - 1) + } + } + } + pub fn get(&mut self, port: PortId) -> Result<(), PortOpError> { + use PortOpError::*; + if !self.native_ports.contains(&port) { + return Err(PortUnavailable); + } + if Getter != *self.port_info.polarities.get(&port).unwrap() { + return Err(WrongPolarity); + } + match &mut self.phased { + ConnectorPhased::Setup { .. } => Err(NotConnected), + ConnectorPhased::Communication { native_batches, .. } => { + let batch = native_batches.last_mut().unwrap(); + if !batch.to_get.insert(port) { + return Err(MultipleOpsOnPort); + } + Ok(()) + } + } + } pub fn sync(&mut self, timeout: Duration) -> Result { use SyncError::*; match &mut self.phased { @@ -128,7 +160,7 @@ impl Connector { round_result, .. } => { - let _deadline = Instant::now() + timeout; + let deadline = Instant::now() + timeout; let logger: &mut dyn Logger = &mut *self.logger; // 1. run all proto components to Nonsync blockers log!( @@ -189,8 +221,8 @@ impl Connector { branching_proto_components.len(), ); - // NOTE: all msgs in outbox are of form (Putter, Payload) - let mut payload_outbox: Vec<(PortId, SendPayloadMsg)> = vec![]; + // NOTE: all msgs in outbox are of form (Getter, Payload) + let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![]; // create the solution storage let mut solution_storage = { @@ -218,21 +250,25 @@ impl Connector { let mut predicate = Predicate::default(); // assign trues for &port in to_get.iter().chain(to_put.keys()) { - predicate.assigned.insert(port, true); + let var = self.port_info.firing_var_for(port); + predicate.assigned.insert(var, true); } // assign falses for &port in self.native_ports.iter() { - predicate.assigned.entry(port).or_insert(false); + let var = self.port_info.firing_var_for(port); + predicate.assigned.entry(var).or_insert(false); } predicate }; log!(logger, "Native branch {} has pred {:?}", index, &predicate); // put all messages - for (port, payload) in to_put { - let msg = SendPayloadMsg { payload_predicate: predicate.clone(), payload }; + for (putter, payload) in to_put { + let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; log!(logger, "Native branch {} sending msg {:?}", index, &msg); - payload_outbox.push((port, msg)); + // rely on invariant: sync batches respect port polarity + let getter = *self.port_info.peers.get(&putter).unwrap(); + payloads_to_get.push((getter, msg)); } if to_get.is_empty() { log!(logger, "Native submitting trivial solution for index {}", index); @@ -276,6 +312,7 @@ impl Connector { let mut ctx = SyncProtoContext { logger, predicate: &predicate, + port_info: &self.port_info, proto_component_id: *proto_component_id, inbox: &branch.inbox, }; @@ -296,7 +333,8 @@ impl Connector { B::SyncBlockEnd => { // make concrete all variables for &port in proto_component.ports.iter() { - predicate.assigned.entry(port).or_insert(false); + let var = self.port_info.firing_var_for(port); + predicate.assigned.entry(var).or_insert(false); } // submit solution for this component solution_storage.submit_and_digest_subtree_solution( @@ -311,14 +349,15 @@ impl Connector { } B::CouldntReadMsg(port) => { // move to "blocked" - assert!(predicate.query(port).is_none()); + let var = self.port_info.firing_var_for(port); + assert!(predicate.query(var).is_none()); assert!(!branch.inbox.contains_key(&port)); blocked.insert(predicate, branch); } B::CouldntCheckFiring(port) => { // sanity check - assert!(predicate.query(port).is_none()); let var = self.port_info.firing_var_for(port); + assert!(predicate.query(var).is_none()); // keep forks in "unblocked" unblocked_to.insert( predicate.clone().inserted(var, false), @@ -326,24 +365,28 @@ impl Connector { ); unblocked_to.insert(predicate.inserted(var, true), branch); } - B::PutMsg(port, payload) => { + B::PutMsg(putter, payload) => { // sanity check - assert_eq!(Some(&Putter), self.port_info.polarities.get(&port)); + assert_eq!( + Some(&Putter), + self.port_info.polarities.get(&putter) + ); // overwrite assignment - let var = self.port_info.firing_var_for(port); + let var = self.port_info.firing_var_for(putter); let was = predicate.assigned.insert(var, true); if was == Some(false) { - log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, port, var); + log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" - log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, port, var); - payload_outbox.push(( - port, + let getter = *self.port_info.peers.get(&putter).unwrap(); + log!(logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + payloads_to_get.push(( + getter, SendPayloadMsg { - payload_predicate: predicate.clone(), + predicate: predicate.clone(), payload, }, )); @@ -358,19 +401,54 @@ impl Connector { log!(logger, "All proto components are blocked"); log!(logger, "Entering decision loop..."); + endpoint_manager.undelay_all(); let decision = 'undecided: loop { - // check if we already have a solution + // drain payloads_to_get, sending them through endpoints / feeding them to components + while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { + assert!(self.port_info.polarities.get(&getter).copied() == Some(Getter)); + match self.port_info.routes.get(&getter).unwrap() { + Route::Endpoint { index } => { + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::SendPayload(send_payload_msg), + }); + endpoint_manager.send_to(*index, &msg).unwrap(); + } + Route::LocalComponent(LocalComponentId::Native) => branching_native + .feed_msg( + logger, + &self.port_info, + &mut solution_storage, + getter, + send_payload_msg, + ), + Route::LocalComponent(LocalComponentId::Proto(proto_component_id)) => { + if let Some(branching_component) = + branching_proto_components.get_mut(&proto_component_id) + { + branching_component.feed_msg( + logger, + &self.port_info, + &mut solution_storage, + getter, + send_payload_msg, + ) + } + } + } + } + + // check if we have a solution yet log!(logger, "Check if we have any local decisions..."); for solution in solution_storage.iter_new_local_make_old() { log!(logger, "New local decision with solution {:?}...", &solution); match neighborhood.parent { Some(parent) => { log!(logger, "Forwarding to my parent {:?}", parent); + let suggestion = Decision::Success(solution); let msg = Msg::CommMsg(CommMsg { round_index: *round_index, - contents: CommMsgContents::Elaborate { - partial_oracle: solution, - }, + contents: CommMsgContents::Suggest { suggestion }, }); endpoint_manager.send_to(parent, &msg).unwrap(); } @@ -381,7 +459,109 @@ impl Connector { } } - // TODO send / recv messages + // stuck! make progress by receiving a msg + // try recv messages arriving through endpoints + log!(logger, "No decision yet. Let's recv an endpoint msg..."); + { + let (endpoint_index, msg) = + endpoint_manager.try_recv_any(deadline).unwrap(); + log!(logger, "Received from endpoint {} msg {:?}", endpoint_index, &msg); + let comm_msg_contents = match msg { + Msg::SetupMsg(..) => { + log!(logger, "Discarding setup message; that phase is over"); + continue 'undecided; + } + Msg::CommMsg(comm_msg) => match comm_msg.round_index.cmp(round_index) { + Ordering::Equal => comm_msg.contents, + Ordering::Less => { + log!( + logger, + "We are in round {}, but msg is for round {}. Discard", + comm_msg.round_index, + round_index, + ); + drop(comm_msg); + continue 'undecided; + } + Ordering::Greater => { + log!( + logger, + "We are in round {}, but msg is for round {}. Buffer", + comm_msg.round_index, + round_index, + ); + endpoint_manager + .delayed_messages + .push((endpoint_index, Msg::CommMsg(comm_msg))); + continue 'undecided; + } + }, + }; + match comm_msg_contents { + CommMsgContents::SendPayload(send_payload_msg) => { + let getter = endpoint_manager.endpoint_exts[endpoint_index] + .getter_for_incoming; + assert!(self.port_info.polarities.get(&getter) == Some(&Getter)); + log!( + logger, + "Msg routed to getter port {:?}. Buffer for recv loop", + getter, + ); + payloads_to_get.push((getter, send_payload_msg)); + } + CommMsgContents::Suggest { suggestion } => { + // only accept this control msg through a child endpoint + if neighborhood.children.binary_search(&endpoint_index).is_ok() { + match suggestion { + Decision::Success(predicate) => { + // child solution contributes to local solution + log!( + logger, + "Child provided solution {:?}", + &predicate + ); + let route = Route::Endpoint { index: endpoint_index }; + solution_storage.submit_and_digest_subtree_solution( + logger, route, predicate, + ); + } + Decision::Failure => match neighborhood.parent { + None => { + log!( + logger, + "As sink, I decide on my child's failure" + ); + // I am the sink. Decide on failed + break 'undecided Decision::Failure; + } + Some(parent) => { + log!(logger, "Forwarding failure through my parent endpoint {:?}", parent); + // I've got a parent. Forward the failure suggestion. + let msg = Msg::CommMsg(CommMsg { + round_index: *round_index, + contents: CommMsgContents::Suggest { + suggestion, + }, + }); + endpoint_manager.send_to(parent, &msg).unwrap(); + } + }, + } + } else { + log!(logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, endpoint_index); + } + } + CommMsgContents::Announce { decision } => { + if Some(endpoint_index) == neighborhood.parent { + // adopt this decision + break 'undecided decision; + } else { + log!(logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, endpoint_index); + } + } + } + } + log!(logger, "Endpoint msg recv done"); }; log!(logger, "Committing to decision {:?}!", &decision); @@ -426,6 +606,72 @@ impl Connector { } } impl BranchingNative { + fn feed_msg( + &mut self, + logger: &mut dyn Logger, + port_info: &PortInfo, + solution_storage: &mut SolutionStorage, + getter: PortId, + send_payload_msg: SendPayloadMsg, + ) { + assert!(port_info.polarities.get(&getter).copied() == Some(Getter)); + println!("BEFORE {:#?}", &self.branches); + let mut draining = HashMap::default(); + let finished = &mut self.branches; + std::mem::swap(&mut draining, finished); + for (predicate, mut branch) in draining.drain() { + // check if this branch expects to receive it + let var = port_info.firing_var_for(getter); + let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { + let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); + assert!(was.is_none()); + branch.to_get.remove(&getter); + if branch.to_get.is_empty() { + let route = Route::LocalComponent(LocalComponentId::Native); + solution_storage.submit_and_digest_subtree_solution( + logger, + route, + predicate.clone(), + ); + } + }; + if predicate.query(var) != Some(true) { + // optimization. Don't bother trying this branch + finished.insert(predicate, branch); + continue; + } + use CommonSatResult as Csr; + match predicate.common_satisfier(&send_payload_msg.predicate) { + Csr::Equivalent | Csr::FormerNotLatter => { + // retain the existing predicate, but add this payload + feed_branch(&mut branch, &predicate); + finished.insert(predicate, branch); + } + Csr::Nonexistant => { + // this branch does not receive the message + finished.insert(predicate, branch); + } + Csr::LatterNotFormer => { + // fork branch, give fork the message and payload predicate + let mut branch2 = branch.clone(); + // original branch untouched + finished.insert(predicate, branch); + let predicate2 = send_payload_msg.predicate.clone(); + feed_branch(&mut branch2, &predicate2); + finished.insert(predicate2, branch2); + } + Csr::New(new_predicate) => { + // fork branch, give fork the message and the new predicate + let mut branch2 = branch.clone(); + // original branch untouched + finished.insert(predicate, branch); + feed_branch(&mut branch2, &new_predicate); + finished.insert(new_predicate, branch2); + } + } + } + println!("AFTER {:#?}", &self.branches); + } fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap) { for (branch_predicate, branch) in self.branches { if branch_predicate.satisfies(solution_predicate) { @@ -437,6 +683,16 @@ impl BranchingNative { } } impl BranchingProtoComponent { + fn feed_msg( + &mut self, + _logger: &mut dyn Logger, + _port_info: &PortInfo, + _solution_storage: &mut SolutionStorage, + _getter: PortId, + _send_payload_msg: SendPayloadMsg, + ) { + todo!() + } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; for (branch_predicate, branch) in branches { diff --git a/src/runtime/error.rs b/src/runtime/error.rs index 8b526dad37c198333c59a79c0890956fd121aaad..672b4ac8c23aca6fc14545447c72e10cd9f43360 100644 --- a/src/runtime/error.rs +++ b/src/runtime/error.rs @@ -33,3 +33,8 @@ pub enum GottenError { PortDidntGet, PreviousSyncFailed, } + +#[derive(Debug, Eq, PartialEq)] +pub enum NextBatchError { + NotConnected, +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 2223cf6be13fcbd670fa28299647756c2e8cea80..8dc24e0df55132b0837ee6875636092ed507b449 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -8,6 +8,10 @@ mod my_tests; use crate::common::*; use error::*; +#[derive( + Debug, Copy, Clone, Eq, PartialEq, Ord, Hash, PartialOrd, serde::Serialize, serde::Deserialize, +)] +pub struct FiringVar(PortId); #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub enum LocalComponentId { Native, @@ -48,13 +52,12 @@ pub struct CommMsg { #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum CommMsgContents { SendPayload(SendPayloadMsg), - Elaborate { partial_oracle: Predicate }, // SINKWARD - Failure, // SINKWARD - Announce { decision: Decision }, // SINKAWAYS + Suggest { suggestion: Decision }, // SINKWARD + Announce { decision: Decision }, // SINKAWAYS } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct SendPayloadMsg { - payload_predicate: Predicate, + predicate: Predicate, payload: Payload, } #[derive(Debug, PartialEq)] @@ -86,7 +89,7 @@ pub struct EndpointSetup { #[derive(Debug)] pub struct EndpointExt { endpoint: Endpoint, - inp_for_emerging_msgs: PortId, + getter_for_incoming: PortId, } #[derive(Debug)] pub struct Neighborhood { @@ -151,7 +154,7 @@ pub enum ConnectorPhased { pub struct StringLogger(ControllerId, String); #[derive(Default, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] pub struct Predicate { - pub assigned: BTreeMap, + pub assigned: BTreeMap, } #[derive(Debug, Default)] pub struct NativeBatch { @@ -175,6 +178,7 @@ pub struct SyncProtoContext<'a> { logger: &'a mut dyn Logger, predicate: &'a Predicate, proto_component_id: ProtoComponentId, + port_info: &'a PortInfo, inbox: &'a HashMap, } @@ -228,11 +232,11 @@ pub struct SyncProtoContext<'a> { //////////////// impl PortInfo { - fn firing_var_for(&self, port: PortId) -> PortId { - match self.polarities.get(&port).unwrap() { + fn firing_var_for(&self, port: PortId) -> FiringVar { + FiringVar(match self.polarities.get(&port).unwrap() { Getter => port, Putter => *self.peers.get(&port).unwrap(), - } + }) } } impl IdManager { @@ -423,10 +427,9 @@ impl Connector { // f.pad("]") // } // } - impl Predicate { #[inline] - pub fn inserted(mut self, k: PortId, v: bool) -> Self { + pub fn inserted(mut self, k: FiringVar, v: bool) -> Self { self.assigned.insert(k, v); self } @@ -460,7 +463,7 @@ impl Predicate { /// FormerNotLatter, LatterNotFormer and Equivalent are returned respectively. /// otherwise New(N) is returned. pub fn common_satisfier(&self, other: &Self) -> CommonSatResult { - use CommonSatResult::*; + use CommonSatResult as Csr; // iterators over assignments of both predicates. Rely on SORTED ordering of BTreeMap's keys. let [mut s_it, mut o_it] = [self.assigned.iter(), other.assigned.iter()]; let [mut s, mut o] = [s_it.next(), o_it.next()]; @@ -491,7 +494,7 @@ impl Predicate { } else if sb != ob { assert_eq!(sid, oid); // both predicates assign the variable but differ on the value - return Nonexistant; + return Csr::Nonexistant; } else { // both predicates assign the variable to the same value s = s_it.next(); @@ -502,9 +505,9 @@ impl Predicate { } // Observed zero inconsistencies. A unified predicate exists... match [s_not_o.is_empty(), o_not_s.is_empty()] { - [true, true] => Equivalent, // ... equivalent to both. - [false, true] => FormerNotLatter, // ... equivalent to self. - [true, false] => LatterNotFormer, // ... equivalent to other. + [true, true] => Csr::Equivalent, // ... equivalent to both. + [false, true] => Csr::FormerNotLatter, // ... equivalent to self. + [true, false] => Csr::LatterNotFormer, // ... equivalent to other. [false, false] => { // ... which is the union of the predicates' assignments but // is equivalent to neither self nor other. @@ -512,22 +515,22 @@ impl Predicate { for (&id, &b) in o_not_s { new.assigned.insert(id, b); } - New(new) + Csr::New(new) } } } - pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { - self.assigned - .iter() - .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) - } + // pub fn iter_matching(&self, value: bool) -> impl Iterator + '_ { + // self.assigned + // .iter() + // .filter_map(move |(&channel_id, &b)| if b == value { Some(channel_id) } else { None }) + // } - pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator, value: bool) { - for channel_id in channel_ids { - self.assigned.entry(channel_id).or_insert(value); - } - } + // pub fn batch_assign_nones(&mut self, channel_ids: impl Iterator, value: bool) { + // for channel_id in channel_ids { + // self.assigned.entry(channel_id).or_insert(value); + // } + // } // pub fn replace_assignment(&mut self, channel_id: PortId, value: bool) -> Option { // self.assigned.insert(channel_id, value) // } @@ -541,8 +544,8 @@ impl Predicate { } Some(res) } - pub fn query(&self, x: PortId) -> Option { - self.assigned.get(&x).copied() + pub fn query(&self, var: FiringVar) -> Option { + self.assigned.get(&var).copied() } } impl Debug for Predicate { diff --git a/src/runtime/my_tests.rs b/src/runtime/my_tests.rs index b5d4a1ba92e524630d814722151f0cbb0ccecb23..a92311027a731553ac2862b56360e52c47cb2c9d 100644 --- a/src/runtime/my_tests.rs +++ b/src/runtime/my_tests.rs @@ -145,3 +145,67 @@ fn connected_gotten_err_ungotten() { c.sync(Duration::from_secs(1)).unwrap(); assert_eq!(reowolf::error::GottenError::PortDidntGet, c.gotten(i).unwrap_err()); } + +#[test] +fn native_polarity_checks() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [o, i] = c.new_port_pair(); + c.connect(Duration::from_secs(1)).unwrap(); + // fail... + c.get(o).unwrap_err(); + c.put(i, (b"hi" as &[_]).into()).unwrap_err(); + // succeed.. + c.get(i).unwrap(); + c.put(o, (b"hi" as &[_]).into()).unwrap(); +} + +#[test] +fn native_multiple_gets() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [_, i] = c.new_port_pair(); + c.connect(Duration::from_secs(1)).unwrap(); + c.get(i).unwrap(); + c.get(i).unwrap_err(); +} + +#[test] +fn next_batch() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + c.next_batch().unwrap_err(); + c.connect(Duration::from_secs(1)).unwrap(); + c.next_batch().unwrap(); + c.next_batch().unwrap(); + c.next_batch().unwrap(); +} + +#[test] +fn native_sync() { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let [o, i] = c.new_port_pair(); + c.connect(Duration::from_secs(1)).unwrap(); + c.get(i).unwrap(); + c.put(o, (b"hi" as &[_]).into()).unwrap(); + c.sync(Duration::from_secs(1)).unwrap(); +} + +#[test] +fn native_message_pass() { + let sock_addr = next_test_addr(); + scope(|s| { + s.spawn(|_| { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 0); + let g = c.new_net_port(Getter, EndpointSetup { sock_addr, is_active: true }).unwrap(); + c.connect(Duration::from_secs(1)).unwrap(); + c.get(g).unwrap(); + c.sync(Duration::from_secs(1)).unwrap(); + }); + s.spawn(|_| { + let mut c = Connector::new_simple(MINIMAL_PROTO.clone(), 1); + let p = c.new_net_port(Putter, EndpointSetup { sock_addr, is_active: false }).unwrap(); + c.connect(Duration::from_secs(1)).unwrap(); + c.put(p, (b"hello" as &[_]).into()).unwrap(); + c.sync(Duration::from_secs(1)).unwrap(); + }); + }) + .unwrap(); +} diff --git a/src/runtime/setup2.rs b/src/runtime/setup2.rs index b3e3000e767f812aa3c417ba9f1a777abaf8016b..ffd21c018fa730bc7d9d1c5c2657baa7aaf14d6c 100644 --- a/src/runtime/setup2.rs +++ b/src/runtime/setup2.rs @@ -96,7 +96,7 @@ impl Connector { Err(()) } ConnectorPhased::Setup { endpoint_setups, .. } => { - log!(self.logger, "~~~ CONNECT called with timeout {:?}", timeout); + log!(self.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = Instant::now() + timeout; // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( @@ -266,12 +266,12 @@ fn new_endpoint_manager( } let endpoint_exts = todos .into_iter() - .map(|Todo { todo_endpoint, recv_peer_port, .. }| EndpointExt { + .map(|Todo { todo_endpoint, local_port, .. }| EndpointExt { endpoint: match todo_endpoint { TodoEndpoint::Endpoint(endpoint) => endpoint, TodoEndpoint::Listener(..) => unreachable!(), }, - inp_for_emerging_msgs: recv_peer_port.unwrap(), + getter_for_incoming: local_port, }) .collect(); Ok(EndpointManager {