From 3dc3c63ab47f21de7b99a63430f61f6ff66a7559 2020-07-15 17:42:50 From: Christopher Esterhuyse Date: 2020-07-15 17:42:50 Subject: [PATCH] added hierarchal structure to connector fields such that fewer pointers to larger sub-structures must be passed around in tight loops of communication phase. Eg: NonSyncContext and SyncContext have shrunk substantially --- diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index e04bb67a030014ca0e1378042b649b7e3eac4597..dc0dc7997d97aeffbd9b22a2a53ae1290bdaa81a 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -42,10 +42,8 @@ struct BranchingProtoComponent { } #[derive(Debug, Clone)] struct ProtoComponentBranch { - did_put_or_get: HashSet, - inbox: HashMap, state: ComponentState, - untaken_choice: Option, + inner: ProtoComponentBranchInner, ended: bool, } struct CyclicDrainer<'a, K: Eq + Hash, V> { @@ -146,11 +144,11 @@ impl Connector { expect_polarity: Polarity, ) -> Result<&mut NativeBatch, PortOpError> { use PortOpError as Poe; - let Self { unphased, phased } = self; - if !unphased.native_ports.contains(&port) { + let Self { unphased: cu, phased } = self; + if !cu.inner.native_ports.contains(&port) { return Err(Poe::PortUnavailable); } - match unphased.port_info.polarities.get(&port) { + match cu.inner.port_info.polarities.get(&port) { Some(p) if *p == expect_polarity => {} Some(_) => return Err(Poe::WrongPolarity), None => return Err(Poe::UnknownPolarity), @@ -190,7 +188,7 @@ impl Connector { ConnectorPhased::Communication(comm) => { match &comm.round_result { Err(SyncError::Unrecoverable(e)) => { - log!(cu.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); + log!(cu.inner.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); return Err(SyncError::Unrecoverable(e.clone())); } _ => {} @@ -216,7 +214,7 @@ impl Connector { use SyncError as Se; ////////////////////////////////// log!( - cu.logger, + cu.inner.logger, "~~~ SYNC called with timeout {:?}; starting round {}", &timeout, comm.round_index @@ -228,20 +226,21 @@ impl Connector { HashMap::::default(); let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); - log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len()); + log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len()); // drains unrun_components, and populates branching_proto_components. while let Some((proto_component_id, mut component)) = unrun_components.pop() { // TODO coalesce fields log!( - cu.logger, + cu.inner.logger, "Nonsync running proto component with ID {:?}. {} to go after this", proto_component_id, unrun_components.len() ); let mut ctx = NonsyncProtoContext { - logger: &mut *cu.logger, - port_info: &mut cu.port_info, - id_manager: &mut cu.id_manager, + cu_inner: &mut cu.inner, + // logger: &mut *cu.inner.logger, + // port_info: &mut cu.inner.port_info, + // id_manager: &mut cu.inner.id_manager, proto_component_id, unrun_components: &mut unrun_components, proto_component_ports: &mut cu @@ -252,7 +251,7 @@ impl Connector { }; let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description); log!( - cu.logger, + cu.inner.logger, "proto component {:?} ran to nonsync blocker {:?}", proto_component_id, &blocker @@ -268,7 +267,7 @@ impl Connector { } } log!( - cu.logger, + cu.inner.logger, "All {} proto components are now done with Nonsync phase", branching_proto_components.len(), ); @@ -288,26 +287,26 @@ impl Connector { .map(|&index| SubtreeId::NetEndpoint { index }); let subtree_id_iter = n.chain(c).chain(e); log!( - cu.logger, + cu.inner.logger, "Children in subtree are: {:?}", subtree_id_iter.clone().collect::>() ); SolutionStorage::new(subtree_id_iter) }, - spec_var_stream: cu.id_manager.new_spec_var_stream(), + spec_var_stream: cu.inner.id_manager.new_spec_var_stream(), getter_buffer: Default::default(), deadline: timeout.map(|to| Instant::now() + to), }; - log!(cu.logger, "Round context structure initialized"); + log!(cu.inner.logger, "Round context structure initialized"); // Explore all native branches eagerly. Find solutions, buffer messages, etc. log!( - cu.logger, + cu.inner.logger, "Translating {} native batches into branches...", comm.native_batches.len() ); let native_branch_spec_var = rctx.spec_var_stream.next(); - log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var); + log!(cu.inner.logger, "Native branch spec var is {:?}", native_branch_spec_var); let mut branching_native = BranchingNative { branches: Default::default() }; 'native_branches: for ((native_branch, index), branch_spec_val) in comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain()) @@ -319,37 +318,42 @@ impl Connector { let firing_ports: HashSet = to_get.iter().chain(to_put.keys()).copied().collect(); for &port in to_get.iter().chain(to_put.keys()) { - let var = cu.port_info.spec_var_for(port); + let var = cu.inner.port_info.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } // assign falses for all silent (not firing) ports - for &port in cu.native_ports.difference(&firing_ports) { - let var = cu.port_info.spec_var_for(port); + for &port in cu.inner.native_ports.difference(&firing_ports) { + let var = cu.inner.port_info.spec_var_for(port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { - log!(cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); + log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; } } // this branch is consistent. distinguish it with a unique var:val mapping and proceed predicate.inserted(native_branch_spec_var, branch_spec_val) }; - log!(cu.logger, "Native branch index={:?} has consistent {:?}", index, &predicate); + log!( + cu.inner.logger, + "Native branch index={:?} has consistent {:?}", + index, + &predicate + ); // send all outgoing messages (by buffering them) for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); + log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg); rctx.getter_buffer.putter_add(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; if branch.is_ended() { log!( - cu.logger, + cu.inner.logger, "Native submitting solution for batch {} with {:?}", index, &predicate ); rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.logger, + &mut *cu.inner.logger, SubtreeId::LocalComponent(ComponentId::Native), predicate.clone(), ); @@ -362,7 +366,8 @@ impl Connector { // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); - comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.logger, &mut rctx.spec_var_stream); + comm.endpoint_manager + .udp_endpoints_round_start(&mut *cu.inner.logger, &mut rctx.spec_var_stream); // Call to another big method; keep running this round until a distributed decision is reached let decision = Self::sync_reach_decision( cu, @@ -371,8 +376,8 @@ impl Connector { &mut branching_proto_components, &mut rctx, )?; - log!(cu.logger, "Committing to decision {:?}!", &decision); - comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger, &decision)?; + log!(cu.inner.logger, "Committing to decision {:?}!", &decision); + comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?; // propagate the decision to children let msg = Msg::CommMsg(CommMsg { @@ -382,7 +387,7 @@ impl Connector { }), }); log!( - cu.logger, + cu.inner.logger, "Announcing decision {:?} through child endpoints {:?}", &msg, &comm.neighborhood.children @@ -405,15 +410,15 @@ impl Connector { .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), ); log!( - cu.logger, + cu.inner.logger, "End round with (updated) component states {:?}", cu.proto_components.keys() ); // consume native - Ok(Some(branching_native.collapse_with(&mut *cu.logger, &predicate))) + Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate))) } }; - log!(cu.logger, "Sync round ending! Cleaning up"); + log!(cu.inner.logger, "Sync round ending! Cleaning up"); // dropping {solution_storage, payloads_to_get} ret } @@ -427,22 +432,22 @@ impl Connector { ) -> Result { let mut already_requested_failure = false; if branching_native.branches.is_empty() { - log!(cu.logger, "Native starts with no branches! Failure!"); + log!(cu.inner.logger, "Native starts with no branches! Failure!"); match comm.neighborhood.parent { Some(parent) => { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "Already requested failure"); + log!(cu.inner.logger, "Already requested failure"); } } None => { - log!(cu.logger, "No parent. Deciding on failure"); + log!(cu.inner.logger, "No parent. Deciding on failure"); return Ok(Decision::Failure); } } } - log!(cu.logger, "Done translating native batches into branches"); + log!(cu.inner.logger, "Done translating native batches into branches"); let mut pcb_temps_owner = <[HashMap; 3]>::default(); let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner); @@ -450,7 +455,7 @@ impl Connector { // run all proto components to their sync blocker log!( - cu.logger, + cu.inner.logger, "Running all {} proto components to their sync blocker...", branching_proto_components.len() ); @@ -472,43 +477,47 @@ impl Connector { // swap the blocked branches back std::mem::swap(blocked.0, branches); if branches.is_empty() { - log!(cu.logger, "{:?} has become inconsistent!", proto_component_id); + log!(cu.inner.logger, "{:?} has become inconsistent!", proto_component_id); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "Already requested failure"); + log!(cu.inner.logger, "Already requested failure"); } } else { - log!(cu.logger, "As the leader, deciding on timeout"); + log!(cu.inner.logger, "As the leader, deciding on timeout"); return Ok(Decision::Failure); } } } - log!(cu.logger, "All proto components are blocked"); + log!(cu.inner.logger, "All proto components are blocked"); - log!(cu.logger, "Entering decision loop..."); + log!(cu.inner.logger, "Entering decision loop..."); comm.endpoint_manager.undelay_all(); 'undecided: loop { // drain payloads_to_get, sending them through endpoints / feeding them to components - log!(cu.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len()); + log!( + cu.inner.logger, + "Decision loop! have {} messages to recv", + rctx.getter_buffer.len() + ); while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { - assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); - let route = cu.port_info.routes.get(&getter); + assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); + let route = cu.inner.port_info.routes.get(&getter); log!( - cu.logger, + cu.inner.logger, "Routing msg {:?} to {:?} via {:?}", &send_payload_msg, getter, &route ); match route { - None => log!(cu.logger, "Delivery failed. Physical route unmapped!"), + None => log!(cu.inner.logger, "Delivery failed. Physical route unmapped!"), Some(Route::UdpEndpoint { index }) => { let udp_endpoint_ext = &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index]; let SendPayloadMsg { predicate, payload } = send_payload_msg; - log!(cu.logger, "Delivering to udp endpoint index={}", index); + log!(cu.inner.logger, "Delivering to udp endpoint index={}", index); udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } Some(Route::NetEndpoint { index }) => { @@ -540,7 +549,7 @@ impl Connector { )?; if branching_component.branches.is_empty() { log!( - cu.logger, + cu.inner.logger, "{:?} has become inconsistent!", proto_component_id ); @@ -548,16 +557,16 @@ impl Connector { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "Already requested failure"); + log!(cu.inner.logger, "Already requested failure"); } } else { - log!(cu.logger, "As the leader, deciding on timeout"); + log!(cu.inner.logger, "As the leader, deciding on timeout"); return Ok(Decision::Failure); } } } else { log!( - cu.logger, + cu.inner.logger, "Delivery to getter {:?} msg {:?} failed because {:?} isn't here", getter, &send_payload_msg, @@ -569,12 +578,12 @@ impl Connector { } // check if we have a solution yet - log!(cu.logger, "Check if we have any local decisions..."); + log!(cu.inner.logger, "Check if we have any local decisions..."); for solution in rctx.solution_storage.iter_new_local_make_old() { - log!(cu.logger, "New local decision with solution {:?}...", &solution); + log!(cu.inner.logger, "New local decision with solution {:?}...", &solution); match comm.neighborhood.parent { Some(parent) => { - log!(cu.logger, "Forwarding to my parent {:?}", parent); + log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); let suggestion = Decision::Success(solution); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, @@ -585,7 +594,7 @@ impl Connector { comm.endpoint_manager.send_to_comms(parent, &msg)?; } None => { - log!(cu.logger, "No parent. Deciding on solution {:?}", &solution); + log!(cu.inner.logger, "No parent. Deciding on solution {:?}", &solution); return Ok(Decision::Success(solution)); } } @@ -593,35 +602,36 @@ impl Connector { // stuck! make progress by receiving a msg // try recv messages arriving through endpoints - log!(cu.logger, "No decision yet. Let's recv an endpoint msg..."); + log!(cu.inner.logger, "No decision yet. Let's recv an endpoint msg..."); { - let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = - match comm.endpoint_manager.try_recv_any_comms( - &mut *cu.logger, - &cu.port_info, - rctx, - comm.round_index, - )? { - CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), - CommRecvOk::NewPayloadMsgs => continue 'undecided, - CommRecvOk::TimeoutWithoutNew => { - log!(cu.logger, "Reached user-defined deadling without decision..."); - if let Some(parent) = comm.neighborhood.parent { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? - } else { - log!(cu.logger, "Already requested failure"); - } + let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm + .endpoint_manager + .try_recv_any_comms( + &mut *cu.inner.logger, + &cu.inner.port_info, + rctx, + comm.round_index, + )? { + CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), + CommRecvOk::NewPayloadMsgs => continue 'undecided, + CommRecvOk::TimeoutWithoutNew => { + log!(cu.inner.logger, "Reached user-defined deadling without decision..."); + if let Some(parent) = comm.neighborhood.parent { + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "As the leader, deciding on timeout"); - return Ok(Decision::Failure); + log!(cu.inner.logger, "Already requested failure"); } - rctx.deadline = None; - continue 'undecided; + } else { + log!(cu.inner.logger, "As the leader, deciding on timeout"); + return Ok(Decision::Failure); } - }; + rctx.deadline = None; + continue 'undecided; + } + }; log!( - cu.logger, + cu.inner.logger, "Received from endpoint {} ctrl msg {:?}", net_index, &comm_ctrl_msg @@ -633,10 +643,14 @@ impl Connector { match suggestion { Decision::Success(predicate) => { // child solution contributes to local solution - log!(cu.logger, "Child provided solution {:?}", &predicate); + log!( + cu.inner.logger, + "Child provided solution {:?}", + &predicate + ); let subtree_id = SubtreeId::NetEndpoint { index: net_index }; rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.logger, + &mut *cu.inner.logger, subtree_id, predicate, ); @@ -644,15 +658,15 @@ impl Connector { Decision::Failure => { match comm.neighborhood.parent { None => { - log!(cu.logger, "I decide on my child's failure"); + log!(cu.inner.logger, "I decide on my child's failure"); break 'undecided Ok(Decision::Failure); } Some(parent) => { - log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent); + log!(cu.inner.logger, "Forwarding failure through my parent endpoint {:?}", parent); if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "Already requested failure"); + log!(cu.inner.logger, "Already requested failure"); } } } @@ -660,7 +674,7 @@ impl Connector { } } else { log!( - cu.logger, + cu.inner.logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, net_index @@ -673,7 +687,7 @@ impl Connector { return Ok(decision); } else { log!( - cu.logger, + cu.inner.logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, net_index @@ -682,7 +696,7 @@ impl Connector { } } } - log!(cu.logger, "Endpoint msg recv done"); + log!(cu.inner.logger, "Endpoint msg recv done"); } } fn request_failure( @@ -690,7 +704,7 @@ impl Connector { comm: &mut ConnectorCommunication, parent: usize, ) -> Result<(), UnrecoverableSyncError> { - log!(cu.logger, "Forwarding to my parent {:?}", parent); + log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); let suggestion = Decision::Failure; let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, @@ -713,35 +727,35 @@ impl BranchingNative { send_payload_msg: &SendPayloadMsg, bn_temp: MapTempGuard<'_, Predicate, NativeBranch>, ) { - log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); + log!(cu.inner.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); + assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); let mut draining = bn_temp; let finished = &mut self.branches; std::mem::swap(draining.0, finished); for (predicate, mut branch) in draining.drain() { - log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); + log!(cu.inner.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it - let var = cu.port_info.spec_var_for(getter); + let var = cu.inner.port_info.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { branch.to_get.remove(&getter); let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); if branch.is_ended() { log!( - cu.logger, + cu.inner.logger, "new native solution with {:?} is_ended() with gotten {:?}", &predicate, &branch.gotten ); let subtree_id = SubtreeId::LocalComponent(ComponentId::Native); round_ctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.logger, + &mut *cu.inner.logger, subtree_id, predicate.clone(), ); } else { log!( - cu.logger, + cu.inner.logger, "Fed native {:?} still has to_get {:?}", &predicate, &branch.to_get @@ -751,7 +765,7 @@ impl BranchingNative { if predicate.query(var) != Some(SpecVal::FIRING) { // optimization. Don't bother trying this branch log!( - cu.logger, + cu.inner.logger, "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); @@ -763,7 +777,7 @@ impl BranchingNative { Aur::Nonexistant => { // this branch does not receive the message log!( - cu.logger, + cu.inner.logger, "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); @@ -772,7 +786,7 @@ impl BranchingNative { Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); - log!(cu.logger, "branch pred covers it! Accept the msg"); + log!(cu.inner.logger, "branch pred covers it! Accept the msg"); Self::insert_branch_merging(finished, predicate, branch); } Aur::LatterNotFormer => { @@ -781,7 +795,7 @@ impl BranchingNative { let predicate2 = send_payload_msg.predicate.clone(); feed_branch(&mut branch2, &predicate2); log!( - cu.logger, + cu.inner.logger, "payload pred {:?} covers branch pred {:?}", &predicate2, &predicate @@ -794,7 +808,7 @@ impl BranchingNative { let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); log!( - cu.logger, + cu.inner.logger, "new subsuming pred created {:?}. forking and feeding", &predicate2 ); @@ -864,16 +878,13 @@ impl BranchingProtoComponent { ) -> Result<(), UnrecoverableSyncError> { cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { - untaken_choice: &mut branch.untaken_choice, - logger: &mut *cu.logger, + cu_inner: &mut cu.inner, predicate: &predicate, - port_info: &cu.port_info, - inbox: &branch.inbox, - did_put_or_get: &mut branch.did_put_or_get, + branch_inner: &mut branch.inner, }; let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description); log!( - cu.logger, + cu.inner.logger, "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", proto_component_id, &predicate, @@ -886,7 +897,7 @@ impl BranchingProtoComponent { for val in SpecVal::iter_domain().take(n as usize) { let pred = predicate.clone().inserted(var, val); let mut branch_n = branch.clone(); - branch_n.untaken_choice = Some(val.0); + branch_n.inner.untaken_choice = Some(val.0); drainer.add_input(pred, branch_n); } } @@ -897,12 +908,12 @@ impl BranchingProtoComponent { B::SyncBlockEnd => { // make concrete all variables for port in ports.iter() { - let var = cu.port_info.spec_var_for(*port); - let should_have_fired = branch.did_put_or_get.contains(port); + let var = cu.inner.port_info.spec_var_for(*port); + let should_have_fired = branch.inner.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let did_fire = val == SpecVal::FIRING; if did_fire != should_have_fired { - log!(cu.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); + log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); // IMPLICIT inconsistency drop((predicate, branch)); return Ok(()); @@ -911,7 +922,7 @@ impl BranchingProtoComponent { // submit solution for this component let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id)); rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.logger, + &mut *cu.inner.logger, subtree_id, predicate.clone(), ); @@ -921,12 +932,12 @@ impl BranchingProtoComponent { } B::CouldntReadMsg(port) => { // move to "blocked" - assert!(!branch.inbox.contains_key(&port)); + assert!(!branch.inner.inbox.contains_key(&port)); drainer.add_output(predicate, branch); } B::CouldntCheckFiring(port) => { // sanity check - let var = cu.port_info.spec_var_for(port); + let var = cu.inner.port_info.spec_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); @@ -934,18 +945,18 @@ impl BranchingProtoComponent { } B::PutMsg(putter, payload) => { // sanity check - assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter)); + assert_eq!(Some(&Putter), cu.inner.port_info.polarities.get(&putter)); // overwrite assignment - let var = cu.port_info.spec_var_for(putter); + let var = cu.inner.port_info.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { - log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); + log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" - branch.did_put_or_get.insert(putter); - log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + branch.inner.did_put_or_get.insert(putter); + log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.getter_buffer.putter_add(cu, putter, msg); drainer.add_input(predicate, branch); @@ -974,7 +985,7 @@ impl BranchingProtoComponent { send_payload_msg: &SendPayloadMsg, pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>, ) -> Result<(), UnrecoverableSyncError> { - let logger = &mut *cu.logger; + let logger = &mut *cu.inner.logger; log!( logger, "feeding proto component {:?} getter {:?} {:?}", @@ -1039,7 +1050,7 @@ impl BranchingProtoComponent { )?; // swap the blocked branches back std::mem::swap(blocked.0, branches); - log!(cu.logger, "component settles down with branches: {:?}", branches.keys()); + log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys()); Ok(()) } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { @@ -1053,13 +1064,7 @@ impl BranchingProtoComponent { panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { - let branch = ProtoComponentBranch { - inbox: Default::default(), - did_put_or_get: Default::default(), - state, - ended: false, - untaken_choice: None, - }; + let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false }; Self { ports, branches: hashmap! { Predicate::default() => branch } } } } @@ -1172,44 +1177,44 @@ impl GetterBuffer { self.getters_and_sends.push((getter, msg)); } fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) { - if let Some(&getter) = cu.port_info.peers.get(&putter) { + if let Some(&getter) = cu.inner.port_info.peers.get(&putter) { self.getter_add(getter, msg); } else { - log!(cu.logger, "Putter {:?} has no known peer!", putter); + log!(cu.inner.logger, "Putter {:?} has no known peer!", putter); panic!("Putter {:?} has no known peer!"); } } } impl SyncProtoContext<'_> { pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.port_info.spec_var_for(port); + let var = self.cu_inner.port_info.spec_var_for(port); self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - self.did_put_or_get.insert(port); - self.inbox.get(&port) + self.branch_inner.did_put_or_get.insert(port); + self.branch_inner.inbox.get(&port) } pub(crate) fn take_choice(&mut self) -> Option { - self.untaken_choice.take() + self.branch_inner.untaken_choice.take() } } impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) { self.swap.insert(k, v); } - fn merge_input_with V>(&mut self, k: K, v: V, mut func: F) { - use std::collections::hash_map::Entry; - let e = self.swap.entry(k); - match e { - Entry::Vacant(ev) => { - ev.insert(v); - } - Entry::Occupied(mut eo) => { - let old = eo.get_mut(); - *old = func(v, old); - } - } - } + // fn merge_input_with V>(&mut self, k: K, v: V, mut func: F) { + // use std::collections::hash_map::Entry; + // let e = self.swap.entry(k); + // match e { + // Entry::Vacant(ev) => { + // ev.insert(v); + // } + // Entry::Occupied(mut eo) => { + // let old = eo.get_mut(); + // *old = func(v, old); + // } + // } + // } fn add_output(&mut self, k: K, v: V) { self.output.insert(k, v); } @@ -1219,7 +1224,7 @@ impl NonsyncProtoContext<'_> { // called by a PROTO COMPONENT. moves its own ports. // 1. sanity check: this component owns these ports log!( - self.logger, + self.cu_inner.logger, "Component {:?} added new component with state {:?}, moving ports {:?}", self.proto_component_id, &state, @@ -1227,29 +1232,33 @@ impl NonsyncProtoContext<'_> { ); assert!(self.proto_component_ports.is_subset(&moved_ports)); // 2. remove ports from old component & update port->route - let new_id = self.id_manager.new_proto_component_id(); + let new_id = self.cu_inner.id_manager.new_proto_component_id(); for port in moved_ports.iter() { self.proto_component_ports.remove(port); - self.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); + self.cu_inner + .port_info + .routes + .insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); } // 3. create a new component self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); } pub fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component - let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; + let [o, i] = + [self.cu_inner.id_manager.new_port_id(), self.cu_inner.id_manager.new_port_id()]; self.proto_component_ports.insert(o); self.proto_component_ports.insert(i); // {polarity, peer, route} known. {} unknown. - self.port_info.polarities.insert(o, Putter); - self.port_info.polarities.insert(i, Getter); - self.port_info.peers.insert(o, i); - self.port_info.peers.insert(i, o); + self.cu_inner.port_info.polarities.insert(o, Putter); + self.cu_inner.port_info.polarities.insert(i, Getter); + self.cu_inner.port_info.peers.insert(o, i); + self.cu_inner.port_info.peers.insert(i, o); let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id)); - self.port_info.routes.insert(o, route); - self.port_info.routes.insert(i, route); + self.cu_inner.port_info.routes.insert(o, route); + self.cu_inner.port_info.routes.insert(i, route); log!( - self.logger, + self.cu_inner.logger, "Component {:?} port pair (out->in) {:?} -> {:?}", self.proto_component_id, o, @@ -1260,7 +1269,7 @@ impl NonsyncProtoContext<'_> { } impl ProtoComponentBranch { fn feed_msg(&mut self, getter: PortId, payload: Payload) { - let was = self.inbox.insert(getter, payload); + let was = self.inner.inbox.insert(getter, payload); assert!(was.is_none()) } } diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index bfd91f31a6e551566f2b7ea929a2e7c20aba7206..fab028f96ff54f15b1f9b8087495042b29d0c01d 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -30,20 +30,22 @@ pub struct DummyLogger; #[derive(Debug)] pub struct FileLogger(ConnectorId, std::fs::File); pub(crate) struct NonsyncProtoContext<'a> { - logger: &'a mut dyn Logger, - proto_component_id: ProtoComponentId, - port_info: &'a mut PortInfo, - id_manager: &'a mut IdManager, + cu_inner: &'a mut ConnectorUnphasedInner, proto_component_ports: &'a mut HashSet, unrun_components: &'a mut Vec<(ProtoComponentId, ProtoComponent)>, + proto_component_id: ProtoComponentId, } pub(crate) struct SyncProtoContext<'a> { - logger: &'a mut dyn Logger, - did_put_or_get: &'a mut HashSet, - untaken_choice: &'a mut Option, + cu_inner: &'a mut ConnectorUnphasedInner, + branch_inner: &'a mut ProtoComponentBranchInner, predicate: &'a Predicate, - port_info: &'a PortInfo, - inbox: &'a HashMap, +} + +#[derive(Default, Debug, Clone)] +struct ProtoComponentBranchInner { + untaken_choice: Option, + did_put_or_get: HashSet, + inbox: HashMap, } #[derive( @@ -232,6 +234,10 @@ struct ConnectorCommunication { struct ConnectorUnphased { proto_description: Arc, proto_components: HashMap, + inner: ConnectorUnphasedInner, +} +#[derive(Debug)] +struct ConnectorUnphasedInner { logger: Box, id_manager: IdManager, native_ports: HashSet, @@ -372,7 +378,7 @@ impl IdManager { } impl Drop for Connector { fn drop(&mut self) { - log!(&mut *self.unphased.logger, "Connector dropping. Goodbye!"); + log!(&mut *self.unphased.inner.logger, "Connector dropping. Goodbye!"); } } impl Connector { @@ -387,27 +393,27 @@ impl Connector { } } pub fn swap_logger(&mut self, mut new_logger: Box) -> Box { - std::mem::swap(&mut self.unphased.logger, &mut new_logger); + std::mem::swap(&mut self.unphased.inner.logger, &mut new_logger); new_logger } pub fn get_logger(&mut self) -> &mut dyn Logger { - &mut *self.unphased.logger + &mut *self.unphased.inner.logger } pub fn new_port_pair(&mut self) -> [PortId; 2] { let cu = &mut self.unphased; // adds two new associated ports, related to each other, and exposed to the native - let [o, i] = [cu.id_manager.new_port_id(), cu.id_manager.new_port_id()]; - cu.native_ports.insert(o); - cu.native_ports.insert(i); + let [o, i] = [cu.inner.id_manager.new_port_id(), cu.inner.id_manager.new_port_id()]; + cu.inner.native_ports.insert(o); + cu.inner.native_ports.insert(i); // {polarity, peer, route} known. {} unknown. - cu.port_info.polarities.insert(o, Putter); - cu.port_info.polarities.insert(i, Getter); - cu.port_info.peers.insert(o, i); - cu.port_info.peers.insert(i, o); + cu.inner.port_info.polarities.insert(o, Putter); + cu.inner.port_info.polarities.insert(i, Getter); + cu.inner.port_info.peers.insert(o, i); + cu.inner.port_info.peers.insert(i, o); let route = Route::LocalComponent(ComponentId::Native); - cu.port_info.routes.insert(o, route); - cu.port_info.routes.insert(i, route); - log!(cu.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); + cu.inner.port_info.routes.insert(o, route); + cu.inner.port_info.routes.insert(i, route); + log!(cu.inner.logger, "Added port pair (out->in) {:?} -> {:?}", o, i); [o, i] } pub fn add_component( @@ -424,19 +430,22 @@ impl Connector { return Err(Ace::WrongNumberOfParamaters { expected: polarities.len() }); } for (&expected_polarity, port) in polarities.iter().zip(ports.iter()) { - if !cu.native_ports.contains(port) { + if !cu.inner.native_ports.contains(port) { return Err(Ace::UnknownPort(*port)); } - if expected_polarity != *cu.port_info.polarities.get(port).unwrap() { + if expected_polarity != *cu.inner.port_info.polarities.get(port).unwrap() { return Err(Ace::WrongPortPolarity { port: *port, expected_polarity }); } } // 3. remove ports from old component & update port->route - let new_id = cu.id_manager.new_proto_component_id(); + let new_id = cu.inner.id_manager.new_proto_component_id(); for port in ports.iter() { - cu.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); + cu.inner + .port_info + .routes + .insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); } - cu.native_ports.retain(|port| !ports.contains(port)); + cu.inner.native_ports.retain(|port| !ports.contains(port)); // 4. add new component cu.proto_components.insert( new_id, diff --git a/src/runtime/setup.rs b/src/runtime/setup.rs index 35c2f1f5ace3c6f4119793f9400dc2621f9fc173..95db7fb554a1569e2b5077733c8639824d15cd06 100644 --- a/src/runtime/setup.rs +++ b/src/runtime/setup.rs @@ -13,10 +13,12 @@ impl Connector { unphased: ConnectorUnphased { proto_description, proto_components: Default::default(), - logger, - id_manager: IdManager::new(connector_id), - native_ports: Default::default(), - port_info: Default::default(), + inner: ConnectorUnphasedInner { + logger, + id_manager: IdManager::new(connector_id), + native_ports: Default::default(), + port_info: Default::default(), + }, }, phased: ConnectorPhased::Setup(Box::new(ConnectorSetup { net_endpoint_setups: Default::default(), @@ -35,22 +37,22 @@ impl Connector { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { let udp_index = setup.udp_endpoint_setups.len(); - let mut npid = || cu.id_manager.new_port_id(); + let mut npid = || cu.inner.id_manager.new_port_id(); let [nin, nout, uin, uout] = [npid(), npid(), npid(), npid()]; - cu.native_ports.insert(nin); - cu.native_ports.insert(nout); - cu.port_info.polarities.insert(nin, Getter); - cu.port_info.polarities.insert(nout, Putter); - cu.port_info.polarities.insert(uin, Getter); - cu.port_info.polarities.insert(uout, Putter); - cu.port_info.peers.insert(nin, uout); - cu.port_info.peers.insert(nout, uin); - cu.port_info.peers.insert(uin, nout); - cu.port_info.peers.insert(uout, nin); - cu.port_info.routes.insert(nin, Route::LocalComponent(ComponentId::Native)); - cu.port_info.routes.insert(nout, Route::LocalComponent(ComponentId::Native)); - cu.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index }); - cu.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index }); + cu.inner.native_ports.insert(nin); + cu.inner.native_ports.insert(nout); + cu.inner.port_info.polarities.insert(nin, Getter); + cu.inner.port_info.polarities.insert(nout, Putter); + cu.inner.port_info.polarities.insert(uin, Getter); + cu.inner.port_info.polarities.insert(uout, Putter); + cu.inner.port_info.peers.insert(nin, uout); + cu.inner.port_info.peers.insert(nout, uin); + cu.inner.port_info.peers.insert(uin, nout); + cu.inner.port_info.peers.insert(uout, nin); + cu.inner.port_info.routes.insert(nin, Route::LocalComponent(ComponentId::Native)); + cu.inner.port_info.routes.insert(nout, Route::LocalComponent(ComponentId::Native)); + cu.inner.port_info.routes.insert(uin, Route::UdpEndpoint { index: udp_index }); + cu.inner.port_info.routes.insert(uout, Route::UdpEndpoint { index: udp_index }); setup.udp_endpoint_setups.push(UdpEndpointSetup { local_addr, peer_addr, @@ -70,13 +72,16 @@ impl Connector { match phased { ConnectorPhased::Communication(..) => Err(WrongStateError), ConnectorPhased::Setup(setup) => { - let local_port = cu.id_manager.new_port_id(); - cu.native_ports.insert(local_port); + let local_port = cu.inner.id_manager.new_port_id(); + cu.inner.native_ports.insert(local_port); // {polarity, route} known. {peer} unknown. - cu.port_info.polarities.insert(local_port, polarity); - cu.port_info.routes.insert(local_port, Route::LocalComponent(ComponentId::Native)); + cu.inner.port_info.polarities.insert(local_port, polarity); + cu.inner + .port_info + .routes + .insert(local_port, Route::LocalComponent(ComponentId::Native)); log!( - cu.logger, + cu.inner.logger, "Added net port {:?} with polarity {:?} addr {:?} endpoint_polarity {:?}", local_port, polarity, @@ -97,33 +102,33 @@ impl Connector { let Self { unphased: cu, phased } = self; match &phased { ConnectorPhased::Communication { .. } => { - log!(cu.logger, "Call to connecting in connected state"); + log!(cu.inner.logger, "Call to connecting in connected state"); Err(Ce::AlreadyConnected) } ConnectorPhased::Setup(setup) => { - log!(cu.logger, "~~~ CONNECT called timeout {:?}", timeout); + log!(cu.inner.logger, "~~~ CONNECT called timeout {:?}", timeout); let deadline = timeout.map(|to| Instant::now() + to); // connect all endpoints in parallel; send and receive peer ids through ports let mut endpoint_manager = new_endpoint_manager( - &mut *cu.logger, + &mut *cu.inner.logger, &setup.net_endpoint_setups, &setup.udp_endpoint_setups, - &mut cu.port_info, + &mut cu.inner.port_info, &deadline, )?; log!( - cu.logger, + cu.inner.logger, "Successfully connected {} endpoints", endpoint_manager.net_endpoint_store.endpoint_exts.len() ); // leader election and tree construction let neighborhood = init_neighborhood( - cu.id_manager.connector_id, - &mut *cu.logger, + cu.inner.id_manager.connector_id, + &mut *cu.inner.logger, &mut endpoint_manager, &deadline, )?; - log!(cu.logger, "Successfully created neighborhood {:?}", &neighborhood); + log!(cu.inner.logger, "Successfully created neighborhood {:?}", &neighborhood); let mut comm = ConnectorCommunication { round_index: 0, endpoint_manager, @@ -134,7 +139,7 @@ impl Connector { if cfg!(feature = "session_optimization") { session_optimize(cu, &mut comm, &deadline)?; } - log!(cu.logger, "connect() finished. setup phase complete"); + log!(cu.inner.logger, "connect() finished. setup phase complete"); self.phased = ConnectorPhased::Communication(Box::new(comm)); Ok(()) } @@ -736,25 +741,25 @@ fn session_optimize( //////////////////////////////////////// use {ConnectError as Ce, Msg::SetupMsg as S, SetupMsg as Sm}; //////////////////////////////////////// - log!(cu.logger, "Beginning session optimization"); + log!(cu.inner.logger, "Beginning session optimization"); // populate session_info_map from a message per child let mut unoptimized_map: HashMap = Default::default(); let mut awaiting: HashSet = comm.neighborhood.children.iter().copied().collect(); comm.endpoint_manager.undelay_all(); while !awaiting.is_empty() { log!( - cu.logger, + cu.inner.logger, "Session gather loop. awaiting info from children {:?}...", awaiting.iter() ); let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; - log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?; + log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::SessionGather { unoptimized_map: child_unoptimized_map }) => { if !awaiting.remove(&recv_index) { log!( - cu.logger, + cu.inner.logger, "Wasn't expecting session info from {:?}. Got {:?}", recv_index, &child_unoptimized_map @@ -767,11 +772,11 @@ fn session_optimize( | msg @ S(Sm::MyPortInfo(..)) | msg @ S(Sm::LeaderAnnounce { .. }) | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.logger, "discarding old message {:?} during election", msg); + log!(cu.inner.logger, "discarding old message {:?} during election", msg); } msg @ S(Sm::SessionScatter { .. }) => { log!( - cu.logger, + cu.inner.logger, "Endpoint {:?} sent unexpected scatter! {:?} I've not contributed yet!", recv_index, &msg @@ -779,18 +784,18 @@ fn session_optimize( return Err(Ce::SetupAlgMisbehavior); } msg @ Msg::CommMsg(..) => { - log!(cu.logger, "delaying msg {:?} during session optimization", msg); + log!(cu.inner.logger, "delaying msg {:?} during session optimization", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } } } log!( - cu.logger, + cu.inner.logger, "Gathered all children's maps. ConnectorId set is... {:?}", unoptimized_map.keys() ); let my_session_info = SessionInfo { - port_info: cu.port_info.clone(), + port_info: cu.inner.port_info.clone(), proto_components: cu.proto_components.clone(), serde_proto_description: SerdeProtocolDescription(cu.proto_description.clone()), endpoint_incoming_to_getter: comm @@ -801,34 +806,38 @@ fn session_optimize( .map(|ee| ee.getter_for_incoming) .collect(), }; - unoptimized_map.insert(cu.id_manager.connector_id, my_session_info); - log!(cu.logger, "Inserting my own info. Unoptimized subtree map is {:?}", &unoptimized_map); + unoptimized_map.insert(cu.inner.id_manager.connector_id, my_session_info); + log!( + cu.inner.logger, + "Inserting my own info. Unoptimized subtree map is {:?}", + &unoptimized_map + ); // acquire the optimized info... let optimized_map = if let Some(parent) = comm.neighborhood.parent { // ... as a message from my parent - log!(cu.logger, "Forwarding gathered info to parent {:?}", parent); + log!(cu.inner.logger, "Forwarding gathered info to parent {:?}", parent); let msg = S(Sm::SessionGather { unoptimized_map }); comm.endpoint_manager.send_to_setup(parent, &msg)?; 'scatter_loop: loop { log!( - cu.logger, + cu.inner.logger, "Session scatter recv loop. awaiting info from children {:?}...", awaiting.iter() ); let (recv_index, msg) = - comm.endpoint_manager.try_recv_any_setup(&mut *cu.logger, deadline)?; - log!(cu.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); + comm.endpoint_manager.try_recv_any_setup(&mut *cu.inner.logger, deadline)?; + log!(cu.inner.logger, "Received from index {:?} msg {:?}", &recv_index, &msg); match msg { S(Sm::SessionScatter { optimized_map }) => { if recv_index != parent { - log!(cu.logger, "I expected the scatter from my parent only!"); + log!(cu.inner.logger, "I expected the scatter from my parent only!"); return Err(Ce::SetupAlgMisbehavior); } break 'scatter_loop optimized_map; } msg @ Msg::CommMsg { .. } => { - log!(cu.logger, "delaying msg {:?} during scatter recv", msg); + log!(cu.inner.logger, "delaying msg {:?} during scatter recv", msg); comm.endpoint_manager.delayed_messages.push((recv_index, msg)); } msg @ S(Sm::SessionGather { .. }) @@ -836,30 +845,30 @@ fn session_optimize( | msg @ S(Sm::MyPortInfo(..)) | msg @ S(Sm::LeaderAnnounce { .. }) | msg @ S(Sm::LeaderWave { .. }) => { - log!(cu.logger, "discarding old message {:?} during election", msg); + log!(cu.inner.logger, "discarding old message {:?} during election", msg); } } } } else { // by computing it myself - log!(cu.logger, "I am the leader! I will optimize this session"); - leader_session_map_optimize(&mut *cu.logger, unoptimized_map)? + log!(cu.inner.logger, "I am the leader! I will optimize this session"); + leader_session_map_optimize(&mut *cu.inner.logger, unoptimized_map)? }; log!( - cu.logger, + cu.inner.logger, "Optimized info map is {:?}. Sending to children {:?}", &optimized_map, comm.neighborhood.children.iter() ); - log!(cu.logger, "All session info dumped!: {:#?}", &optimized_map); + log!(cu.inner.logger, "All session info dumped!: {:#?}", &optimized_map); let optimized_info = - optimized_map.get(&cu.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); + optimized_map.get(&cu.inner.id_manager.connector_id).expect("HEY NO INFO FOR ME?").clone(); let msg = S(Sm::SessionScatter { optimized_map }); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_setup(child, &msg)?; } apply_optimizations(cu, comm, optimized_info)?; - log!(cu.logger, "Session optimizations applied"); + log!(cu.inner.logger, "Session optimizations applied"); Ok(()) } fn leader_session_map_optimize( @@ -882,7 +891,7 @@ fn apply_optimizations( endpoint_incoming_to_getter, } = session_info; // TODO some info which should be read-only can be mutated with the current scheme - cu.port_info = port_info; + cu.inner.port_info = port_info; cu.proto_components = proto_components; cu.proto_description = serde_proto_description.0; for (ee, getter) in comm