diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 06e451f7436615250460f56cd70db10e6bd25d04..4ef22a5789e6fe957ccfff8cfc4c5722ac34f07d 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -45,6 +45,8 @@ impl ReplaceBoolTrue for bool { !was } } +// CuUndecided provides a mostly immutable view into the ConnectorUnphased structure, +// making it harder to accidentally mutate its contents in a way that cannot be rolled back. impl CuUndecided for ConnectorUnphased { fn logger(&mut self) -> &mut dyn Logger { &mut *self.inner.logger @@ -161,7 +163,7 @@ impl Connector { ConnectorPhased::Communication(comm) => { match &comm.round_result { Err(SyncError::Unrecoverable(e)) => { - log!(cu.inner.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); + log!(cu.logger(), "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); return Err(SyncError::Unrecoverable(e.clone())); } _ => {} @@ -187,18 +189,18 @@ impl Connector { use SyncError as Se; ////////////////////////////////// - log!(@MARK, cu.inner.logger, "sync start {}", comm.round_index); + log!(@MARK, cu.logger(), "sync start {}", comm.round_index); log!( - cu.inner.logger, + cu.logger(), "~~~ SYNC called with timeout {:?}; starting round {}", &timeout, comm.round_index ); - log!(@BENCH, cu.inner.logger, ""); + log!(@BENCH, cu.logger(), ""); // 1. run all proto components to Nonsync blockers // iterate - let current_state = cu.inner.current_state.clone(); + let mut current_state = cu.inner.current_state.clone(); let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu @@ -206,23 +208,24 @@ impl Connector { .iter() .map(|(&proto_id, proto)| (proto_id, proto.clone())) .collect(); - log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len()); + log!(cu.logger(), "Nonsync running {} proto components...", unrun_components.len()); // drains unrun_components, and populates branching_proto_components. while let Some((proto_component_id, mut component)) = unrun_components.pop() { log!( - cu.inner.logger, + cu.logger(), "Nonsync running proto component with ID {:?}. {} to go after this", proto_component_id, unrun_components.len() ); let mut ctx = NonsyncProtoContext { - cu_inner: &mut cu.inner, + current_state: &mut current_state, + logger: &mut *cu.inner.logger, proto_component_id, unrun_components: &mut unrun_components, }; let blocker = component.nonsync_run(&mut ctx, &cu.proto_description); log!( - cu.inner.logger, + cu.logger(), "proto component {:?} ran to nonsync blocker {:?}", proto_component_id, &blocker @@ -238,11 +241,11 @@ impl Connector { } } log!( - cu.inner.logger, + cu.logger(), "All {} proto components are now done with Nonsync phase", branching_proto_components.len(), ); - log!(@BENCH, cu.inner.logger, ""); + log!(@BENCH, cu.logger(), ""); // Create temp structures needed for the synchronous phase of the round let mut rctx = RoundCtx { @@ -267,17 +270,17 @@ impl Connector { payload_inbox: Default::default(), deadline: timeout.map(|to| Instant::now() + to), }; - log!(cu.inner.logger, "Round context structure initialized"); - log!(@BENCH, cu.inner.logger, ""); + log!(cu.logger(), "Round context structure initialized"); + log!(@BENCH, cu.logger(), ""); // Explore all native branches eagerly. Find solutions, buffer messages, etc. log!( - cu.inner.logger, + cu.logger(), "Translating {} native batches into branches...", comm.native_batches.len() ); let native_spec_var = rctx.spec_var_stream.next(); - log!(cu.inner.logger, "Native branch spec var is {:?}", native_spec_var); + log!(cu.logger(), "Native branch spec var is {:?}", native_spec_var); let mut branching_native = BranchingNative { branches: Default::default() }; 'native_branches: for ((native_branch, index), branch_spec_val) in comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain()) @@ -289,7 +292,7 @@ impl Connector { let firing_iter = to_get.iter().chain(to_put.keys()).copied(); log!( - cu.inner.logger, + cu.logger(), "New native with firing ports {:?}", firing_iter.clone().collect::>() ); @@ -310,24 +313,19 @@ impl Connector { } let var = cu.inner.current_state.spec_var_for(*port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { - log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); + log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; } } // this branch is consistent. distinguish it with a unique var:val mapping and proceed predicate.inserted(native_spec_var, branch_spec_val) }; - log!( - cu.inner.logger, - "Native branch index={:?} has consistent {:?}", - index, - &predicate - ); + log!(cu.logger(), "Native branch index={:?} has consistent {:?}", index, &predicate); // send all outgoing messages (by buffering them) for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; log!( - cu.inner.logger, + cu.logger(), "Native branch {} sending msg {:?} with putter {:?}", index, &msg, @@ -340,7 +338,7 @@ impl Connector { let branch = NativeBranch { index, gotten: Default::default(), to_get }; if branch.is_ended() { log!( - cu.inner.logger, + cu.logger(), "Native submitting solution for batch {} with {:?}", index, &predicate @@ -359,8 +357,8 @@ impl Connector { // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); // Call to another big method; keep running this round until a distributed decision is reached - log!(cu.inner.logger, "Searching for decision..."); - log!(@BENCH, cu.inner.logger, ""); + log!(cu.logger(), "Searching for decision..."); + log!(@BENCH, cu.logger(), ""); let decision = Self::sync_reach_decision( cu, comm, @@ -368,10 +366,10 @@ impl Connector { &mut branching_proto_components, &mut rctx, )?; - log!(@MARK, cu.inner.logger, "got decision!"); - log!(cu.inner.logger, "Committing to decision {:?}!", &decision); - log!(@BENCH, cu.inner.logger, ""); - comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?; + log!(@MARK, cu.logger(), "got decision!"); + log!(cu.logger(), "Committing to decision {:?}!", &decision); + log!(@BENCH, cu.logger(), ""); + comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger(), &decision)?; // propagate the decision to children let msg = Msg::CommMsg(CommMsg { @@ -381,12 +379,12 @@ impl Connector { }), }); log!( - cu.inner.logger, + cu.logger(), "Announcing decision {:?} through child endpoints {:?}", &msg, &comm.neighborhood.children ); - log!(@MARK, cu.inner.logger, "forwarding decision!"); + log!(@MARK, cu.logger(), "forwarding decision!"); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_comms(child, &msg)?; } @@ -412,11 +410,11 @@ impl Connector { cu.proto_components.keys() ); // consume native - Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate))) + Ok(Some(branching_native.collapse_with(&mut *cu.logger(), &predicate))) } }; - log!(cu.inner.logger, "Sync round ending! Cleaning up"); - log!(@BENCH, cu.inner.logger, ""); + log!(cu.logger(), "Sync round ending! Cleaning up"); + log!(@BENCH, cu.logger(), ""); ret } @@ -1166,7 +1164,7 @@ impl NonsyncProtoContext<'_> { // called by a PROTO COMPONENT. moves its own ports. // 1. sanity check: this component owns these ports log!( - self.cu_inner.logger, + self.logger, "Component {:?} added new component with state {:?}, moving ports {:?}", self.proto_component_id, &state, @@ -1177,23 +1175,23 @@ impl NonsyncProtoContext<'_> { for port in moved_ports.iter() { assert_eq!( self.proto_component_id, - self.cu_inner.current_state.port_info.get(port).unwrap().owner + self.current_state.port_info.get(port).unwrap().owner ); } // 2. create new component - let new_cid = self.cu_inner.current_state.id_manager.new_component_id(); + let new_cid = self.current_state.id_manager.new_component_id(); self.unrun_components.push((new_cid, state)); // 3. update ownership of moved ports for port in moved_ports.iter() { - self.cu_inner.current_state.port_info.get_mut(port).unwrap().owner = new_cid; + self.current_state.port_info.get_mut(port).unwrap().owner = new_cid; } // 3. create a new component } pub fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component - let mut new_cid_fn = || self.cu_inner.current_state.id_manager.new_port_id(); + let mut new_cid_fn = || self.current_state.id_manager.new_port_id(); let [o, i] = [new_cid_fn(), new_cid_fn()]; - self.cu_inner.current_state.port_info.insert( + self.current_state.port_info.insert( o, PortInfo { route: Route::LocalComponent, @@ -1202,7 +1200,7 @@ impl NonsyncProtoContext<'_> { owner: self.proto_component_id, }, ); - self.cu_inner.current_state.port_info.insert( + self.current_state.port_info.insert( i, PortInfo { route: Route::LocalComponent, @@ -1212,7 +1210,7 @@ impl NonsyncProtoContext<'_> { }, ); log!( - self.cu_inner.logger, + self.logger, "Component {:?} port pair (out->in) {:?} -> {:?}", self.proto_component_id, o, diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index e83c3598d94397628c62c52f4bade45a3ee44ee1..3bb91116e2fe5e89700c3fcd581541514f7869b5 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -35,13 +35,14 @@ struct CurrentState { id_manager: IdManager, } pub(crate) struct NonsyncProtoContext<'a> { - cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds + current_state: &'a mut CurrentState, + logger: &'a mut dyn Logger, + // cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds unrun_components: &'a mut Vec<(ComponentId, ComponentState)>, // lives for Nonsync phase - proto_component_id: ComponentId, // KEY in id->component map + proto_component_id: ComponentId, // KEY in id->component map } pub(crate) struct SyncProtoContext<'a> { rctx: &'a RoundCtx, - // cu: &'a mut dyn CuUndecided, branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch predicate: &'a Predicate, // KEY in pred->branch map }