diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index e04bb67a030014ca0e1378042b649b7e3eac4597..dc0dc7997d97aeffbd9b22a2a53ae1290bdaa81a 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -42,10 +42,8 @@ struct BranchingProtoComponent { } #[derive(Debug, Clone)] struct ProtoComponentBranch { - did_put_or_get: HashSet, - inbox: HashMap, state: ComponentState, - untaken_choice: Option, + inner: ProtoComponentBranchInner, ended: bool, } struct CyclicDrainer<'a, K: Eq + Hash, V> { @@ -146,11 +144,11 @@ impl Connector { expect_polarity: Polarity, ) -> Result<&mut NativeBatch, PortOpError> { use PortOpError as Poe; - let Self { unphased, phased } = self; - if !unphased.native_ports.contains(&port) { + let Self { unphased: cu, phased } = self; + if !cu.inner.native_ports.contains(&port) { return Err(Poe::PortUnavailable); } - match unphased.port_info.polarities.get(&port) { + match cu.inner.port_info.polarities.get(&port) { Some(p) if *p == expect_polarity => {} Some(_) => return Err(Poe::WrongPolarity), None => return Err(Poe::UnknownPolarity), @@ -190,7 +188,7 @@ impl Connector { ConnectorPhased::Communication(comm) => { match &comm.round_result { Err(SyncError::Unrecoverable(e)) => { - log!(cu.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); + log!(cu.inner.logger, "Attempted to start sync round, but previous error {:?} was unrecoverable!", e); return Err(SyncError::Unrecoverable(e.clone())); } _ => {} @@ -216,7 +214,7 @@ impl Connector { use SyncError as Se; ////////////////////////////////// log!( - cu.logger, + cu.inner.logger, "~~~ SYNC called with timeout {:?}; starting round {}", &timeout, comm.round_index @@ -228,20 +226,21 @@ impl Connector { HashMap::::default(); let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); - log!(cu.logger, "Nonsync running {} proto components...", unrun_components.len()); + log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len()); // drains unrun_components, and populates branching_proto_components. while let Some((proto_component_id, mut component)) = unrun_components.pop() { // TODO coalesce fields log!( - cu.logger, + cu.inner.logger, "Nonsync running proto component with ID {:?}. {} to go after this", proto_component_id, unrun_components.len() ); let mut ctx = NonsyncProtoContext { - logger: &mut *cu.logger, - port_info: &mut cu.port_info, - id_manager: &mut cu.id_manager, + cu_inner: &mut cu.inner, + // logger: &mut *cu.inner.logger, + // port_info: &mut cu.inner.port_info, + // id_manager: &mut cu.inner.id_manager, proto_component_id, unrun_components: &mut unrun_components, proto_component_ports: &mut cu @@ -252,7 +251,7 @@ impl Connector { }; let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description); log!( - cu.logger, + cu.inner.logger, "proto component {:?} ran to nonsync blocker {:?}", proto_component_id, &blocker @@ -268,7 +267,7 @@ impl Connector { } } log!( - cu.logger, + cu.inner.logger, "All {} proto components are now done with Nonsync phase", branching_proto_components.len(), ); @@ -288,26 +287,26 @@ impl Connector { .map(|&index| SubtreeId::NetEndpoint { index }); let subtree_id_iter = n.chain(c).chain(e); log!( - cu.logger, + cu.inner.logger, "Children in subtree are: {:?}", subtree_id_iter.clone().collect::>() ); SolutionStorage::new(subtree_id_iter) }, - spec_var_stream: cu.id_manager.new_spec_var_stream(), + spec_var_stream: cu.inner.id_manager.new_spec_var_stream(), getter_buffer: Default::default(), deadline: timeout.map(|to| Instant::now() + to), }; - log!(cu.logger, "Round context structure initialized"); + log!(cu.inner.logger, "Round context structure initialized"); // Explore all native branches eagerly. Find solutions, buffer messages, etc. log!( - cu.logger, + cu.inner.logger, "Translating {} native batches into branches...", comm.native_batches.len() ); let native_branch_spec_var = rctx.spec_var_stream.next(); - log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var); + log!(cu.inner.logger, "Native branch spec var is {:?}", native_branch_spec_var); let mut branching_native = BranchingNative { branches: Default::default() }; 'native_branches: for ((native_branch, index), branch_spec_val) in comm.native_batches.drain(..).zip(0..).zip(SpecVal::iter_domain()) @@ -319,37 +318,42 @@ impl Connector { let firing_ports: HashSet = to_get.iter().chain(to_put.keys()).copied().collect(); for &port in to_get.iter().chain(to_put.keys()) { - let var = cu.port_info.spec_var_for(port); + let var = cu.inner.port_info.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } // assign falses for all silent (not firing) ports - for &port in cu.native_ports.difference(&firing_ports) { - let var = cu.port_info.spec_var_for(port); + for &port in cu.inner.native_ports.difference(&firing_ports) { + let var = cu.inner.port_info.spec_var_for(port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { - log!(cu.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); + log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; } } // this branch is consistent. distinguish it with a unique var:val mapping and proceed predicate.inserted(native_branch_spec_var, branch_spec_val) }; - log!(cu.logger, "Native branch index={:?} has consistent {:?}", index, &predicate); + log!( + cu.inner.logger, + "Native branch index={:?} has consistent {:?}", + index, + &predicate + ); // send all outgoing messages (by buffering them) for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); + log!(cu.inner.logger, "Native branch {} sending msg {:?}", index, &msg); rctx.getter_buffer.putter_add(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; if branch.is_ended() { log!( - cu.logger, + cu.inner.logger, "Native submitting solution for batch {} with {:?}", index, &predicate ); rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.logger, + &mut *cu.inner.logger, SubtreeId::LocalComponent(ComponentId::Native), predicate.clone(), ); @@ -362,7 +366,8 @@ impl Connector { // restore the invariant: !native_batches.is_empty() comm.native_batches.push(Default::default()); - comm.endpoint_manager.udp_endpoints_round_start(&mut *cu.logger, &mut rctx.spec_var_stream); + comm.endpoint_manager + .udp_endpoints_round_start(&mut *cu.inner.logger, &mut rctx.spec_var_stream); // Call to another big method; keep running this round until a distributed decision is reached let decision = Self::sync_reach_decision( cu, @@ -371,8 +376,8 @@ impl Connector { &mut branching_proto_components, &mut rctx, )?; - log!(cu.logger, "Committing to decision {:?}!", &decision); - comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.logger, &decision)?; + log!(cu.inner.logger, "Committing to decision {:?}!", &decision); + comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?; // propagate the decision to children let msg = Msg::CommMsg(CommMsg { @@ -382,7 +387,7 @@ impl Connector { }), }); log!( - cu.logger, + cu.inner.logger, "Announcing decision {:?} through child endpoints {:?}", &msg, &comm.neighborhood.children @@ -405,15 +410,15 @@ impl Connector { .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), ); log!( - cu.logger, + cu.inner.logger, "End round with (updated) component states {:?}", cu.proto_components.keys() ); // consume native - Ok(Some(branching_native.collapse_with(&mut *cu.logger, &predicate))) + Ok(Some(branching_native.collapse_with(&mut *cu.inner.logger, &predicate))) } }; - log!(cu.logger, "Sync round ending! Cleaning up"); + log!(cu.inner.logger, "Sync round ending! Cleaning up"); // dropping {solution_storage, payloads_to_get} ret } @@ -427,22 +432,22 @@ impl Connector { ) -> Result { let mut already_requested_failure = false; if branching_native.branches.is_empty() { - log!(cu.logger, "Native starts with no branches! Failure!"); + log!(cu.inner.logger, "Native starts with no branches! Failure!"); match comm.neighborhood.parent { Some(parent) => { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "Already requested failure"); + log!(cu.inner.logger, "Already requested failure"); } } None => { - log!(cu.logger, "No parent. Deciding on failure"); + log!(cu.inner.logger, "No parent. Deciding on failure"); return Ok(Decision::Failure); } } } - log!(cu.logger, "Done translating native batches into branches"); + log!(cu.inner.logger, "Done translating native batches into branches"); let mut pcb_temps_owner = <[HashMap; 3]>::default(); let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner); @@ -450,7 +455,7 @@ impl Connector { // run all proto components to their sync blocker log!( - cu.logger, + cu.inner.logger, "Running all {} proto components to their sync blocker...", branching_proto_components.len() ); @@ -472,43 +477,47 @@ impl Connector { // swap the blocked branches back std::mem::swap(blocked.0, branches); if branches.is_empty() { - log!(cu.logger, "{:?} has become inconsistent!", proto_component_id); + log!(cu.inner.logger, "{:?} has become inconsistent!", proto_component_id); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "Already requested failure"); + log!(cu.inner.logger, "Already requested failure"); } } else { - log!(cu.logger, "As the leader, deciding on timeout"); + log!(cu.inner.logger, "As the leader, deciding on timeout"); return Ok(Decision::Failure); } } } - log!(cu.logger, "All proto components are blocked"); + log!(cu.inner.logger, "All proto components are blocked"); - log!(cu.logger, "Entering decision loop..."); + log!(cu.inner.logger, "Entering decision loop..."); comm.endpoint_manager.undelay_all(); 'undecided: loop { // drain payloads_to_get, sending them through endpoints / feeding them to components - log!(cu.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len()); + log!( + cu.inner.logger, + "Decision loop! have {} messages to recv", + rctx.getter_buffer.len() + ); while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { - assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); - let route = cu.port_info.routes.get(&getter); + assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); + let route = cu.inner.port_info.routes.get(&getter); log!( - cu.logger, + cu.inner.logger, "Routing msg {:?} to {:?} via {:?}", &send_payload_msg, getter, &route ); match route { - None => log!(cu.logger, "Delivery failed. Physical route unmapped!"), + None => log!(cu.inner.logger, "Delivery failed. Physical route unmapped!"), Some(Route::UdpEndpoint { index }) => { let udp_endpoint_ext = &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index]; let SendPayloadMsg { predicate, payload } = send_payload_msg; - log!(cu.logger, "Delivering to udp endpoint index={}", index); + log!(cu.inner.logger, "Delivering to udp endpoint index={}", index); udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } Some(Route::NetEndpoint { index }) => { @@ -540,7 +549,7 @@ impl Connector { )?; if branching_component.branches.is_empty() { log!( - cu.logger, + cu.inner.logger, "{:?} has become inconsistent!", proto_component_id ); @@ -548,16 +557,16 @@ impl Connector { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "Already requested failure"); + log!(cu.inner.logger, "Already requested failure"); } } else { - log!(cu.logger, "As the leader, deciding on timeout"); + log!(cu.inner.logger, "As the leader, deciding on timeout"); return Ok(Decision::Failure); } } } else { log!( - cu.logger, + cu.inner.logger, "Delivery to getter {:?} msg {:?} failed because {:?} isn't here", getter, &send_payload_msg, @@ -569,12 +578,12 @@ impl Connector { } // check if we have a solution yet - log!(cu.logger, "Check if we have any local decisions..."); + log!(cu.inner.logger, "Check if we have any local decisions..."); for solution in rctx.solution_storage.iter_new_local_make_old() { - log!(cu.logger, "New local decision with solution {:?}...", &solution); + log!(cu.inner.logger, "New local decision with solution {:?}...", &solution); match comm.neighborhood.parent { Some(parent) => { - log!(cu.logger, "Forwarding to my parent {:?}", parent); + log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); let suggestion = Decision::Success(solution); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, @@ -585,7 +594,7 @@ impl Connector { comm.endpoint_manager.send_to_comms(parent, &msg)?; } None => { - log!(cu.logger, "No parent. Deciding on solution {:?}", &solution); + log!(cu.inner.logger, "No parent. Deciding on solution {:?}", &solution); return Ok(Decision::Success(solution)); } } @@ -593,35 +602,36 @@ impl Connector { // stuck! make progress by receiving a msg // try recv messages arriving through endpoints - log!(cu.logger, "No decision yet. Let's recv an endpoint msg..."); + log!(cu.inner.logger, "No decision yet. Let's recv an endpoint msg..."); { - let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = - match comm.endpoint_manager.try_recv_any_comms( - &mut *cu.logger, - &cu.port_info, - rctx, - comm.round_index, - )? { - CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), - CommRecvOk::NewPayloadMsgs => continue 'undecided, - CommRecvOk::TimeoutWithoutNew => { - log!(cu.logger, "Reached user-defined deadling without decision..."); - if let Some(parent) = comm.neighborhood.parent { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? - } else { - log!(cu.logger, "Already requested failure"); - } + let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm + .endpoint_manager + .try_recv_any_comms( + &mut *cu.inner.logger, + &cu.inner.port_info, + rctx, + comm.round_index, + )? { + CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), + CommRecvOk::NewPayloadMsgs => continue 'undecided, + CommRecvOk::TimeoutWithoutNew => { + log!(cu.inner.logger, "Reached user-defined deadling without decision..."); + if let Some(parent) = comm.neighborhood.parent { + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "As the leader, deciding on timeout"); - return Ok(Decision::Failure); + log!(cu.inner.logger, "Already requested failure"); } - rctx.deadline = None; - continue 'undecided; + } else { + log!(cu.inner.logger, "As the leader, deciding on timeout"); + return Ok(Decision::Failure); } - }; + rctx.deadline = None; + continue 'undecided; + } + }; log!( - cu.logger, + cu.inner.logger, "Received from endpoint {} ctrl msg {:?}", net_index, &comm_ctrl_msg @@ -633,10 +643,14 @@ impl Connector { match suggestion { Decision::Success(predicate) => { // child solution contributes to local solution - log!(cu.logger, "Child provided solution {:?}", &predicate); + log!( + cu.inner.logger, + "Child provided solution {:?}", + &predicate + ); let subtree_id = SubtreeId::NetEndpoint { index: net_index }; rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.logger, + &mut *cu.inner.logger, subtree_id, predicate, ); @@ -644,15 +658,15 @@ impl Connector { Decision::Failure => { match comm.neighborhood.parent { None => { - log!(cu.logger, "I decide on my child's failure"); + log!(cu.inner.logger, "I decide on my child's failure"); break 'undecided Ok(Decision::Failure); } Some(parent) => { - log!(cu.logger, "Forwarding failure through my parent endpoint {:?}", parent); + log!(cu.inner.logger, "Forwarding failure through my parent endpoint {:?}", parent); if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.logger, "Already requested failure"); + log!(cu.inner.logger, "Already requested failure"); } } } @@ -660,7 +674,7 @@ impl Connector { } } else { log!( - cu.logger, + cu.inner.logger, "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, net_index @@ -673,7 +687,7 @@ impl Connector { return Ok(decision); } else { log!( - cu.logger, + cu.inner.logger, "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, net_index @@ -682,7 +696,7 @@ impl Connector { } } } - log!(cu.logger, "Endpoint msg recv done"); + log!(cu.inner.logger, "Endpoint msg recv done"); } } fn request_failure( @@ -690,7 +704,7 @@ impl Connector { comm: &mut ConnectorCommunication, parent: usize, ) -> Result<(), UnrecoverableSyncError> { - log!(cu.logger, "Forwarding to my parent {:?}", parent); + log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); let suggestion = Decision::Failure; let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, @@ -713,35 +727,35 @@ impl BranchingNative { send_payload_msg: &SendPayloadMsg, bn_temp: MapTempGuard<'_, Predicate, NativeBranch>, ) { - log!(cu.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); + log!(cu.inner.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); + assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); let mut draining = bn_temp; let finished = &mut self.branches; std::mem::swap(draining.0, finished); for (predicate, mut branch) in draining.drain() { - log!(cu.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); + log!(cu.inner.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it - let var = cu.port_info.spec_var_for(getter); + let var = cu.inner.port_info.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { branch.to_get.remove(&getter); let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); if branch.is_ended() { log!( - cu.logger, + cu.inner.logger, "new native solution with {:?} is_ended() with gotten {:?}", &predicate, &branch.gotten ); let subtree_id = SubtreeId::LocalComponent(ComponentId::Native); round_ctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.logger, + &mut *cu.inner.logger, subtree_id, predicate.clone(), ); } else { log!( - cu.logger, + cu.inner.logger, "Fed native {:?} still has to_get {:?}", &predicate, &branch.to_get @@ -751,7 +765,7 @@ impl BranchingNative { if predicate.query(var) != Some(SpecVal::FIRING) { // optimization. Don't bother trying this branch log!( - cu.logger, + cu.inner.logger, "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); @@ -763,7 +777,7 @@ impl BranchingNative { Aur::Nonexistant => { // this branch does not receive the message log!( - cu.logger, + cu.inner.logger, "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); @@ -772,7 +786,7 @@ impl BranchingNative { Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); - log!(cu.logger, "branch pred covers it! Accept the msg"); + log!(cu.inner.logger, "branch pred covers it! Accept the msg"); Self::insert_branch_merging(finished, predicate, branch); } Aur::LatterNotFormer => { @@ -781,7 +795,7 @@ impl BranchingNative { let predicate2 = send_payload_msg.predicate.clone(); feed_branch(&mut branch2, &predicate2); log!( - cu.logger, + cu.inner.logger, "payload pred {:?} covers branch pred {:?}", &predicate2, &predicate @@ -794,7 +808,7 @@ impl BranchingNative { let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); log!( - cu.logger, + cu.inner.logger, "new subsuming pred created {:?}. forking and feeding", &predicate2 ); @@ -864,16 +878,13 @@ impl BranchingProtoComponent { ) -> Result<(), UnrecoverableSyncError> { cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { - untaken_choice: &mut branch.untaken_choice, - logger: &mut *cu.logger, + cu_inner: &mut cu.inner, predicate: &predicate, - port_info: &cu.port_info, - inbox: &branch.inbox, - did_put_or_get: &mut branch.did_put_or_get, + branch_inner: &mut branch.inner, }; let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description); log!( - cu.logger, + cu.inner.logger, "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", proto_component_id, &predicate, @@ -886,7 +897,7 @@ impl BranchingProtoComponent { for val in SpecVal::iter_domain().take(n as usize) { let pred = predicate.clone().inserted(var, val); let mut branch_n = branch.clone(); - branch_n.untaken_choice = Some(val.0); + branch_n.inner.untaken_choice = Some(val.0); drainer.add_input(pred, branch_n); } } @@ -897,12 +908,12 @@ impl BranchingProtoComponent { B::SyncBlockEnd => { // make concrete all variables for port in ports.iter() { - let var = cu.port_info.spec_var_for(*port); - let should_have_fired = branch.did_put_or_get.contains(port); + let var = cu.inner.port_info.spec_var_for(*port); + let should_have_fired = branch.inner.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let did_fire = val == SpecVal::FIRING; if did_fire != should_have_fired { - log!(cu.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); + log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); // IMPLICIT inconsistency drop((predicate, branch)); return Ok(()); @@ -911,7 +922,7 @@ impl BranchingProtoComponent { // submit solution for this component let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id)); rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.logger, + &mut *cu.inner.logger, subtree_id, predicate.clone(), ); @@ -921,12 +932,12 @@ impl BranchingProtoComponent { } B::CouldntReadMsg(port) => { // move to "blocked" - assert!(!branch.inbox.contains_key(&port)); + assert!(!branch.inner.inbox.contains_key(&port)); drainer.add_output(predicate, branch); } B::CouldntCheckFiring(port) => { // sanity check - let var = cu.port_info.spec_var_for(port); + let var = cu.inner.port_info.spec_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); @@ -934,18 +945,18 @@ impl BranchingProtoComponent { } B::PutMsg(putter, payload) => { // sanity check - assert_eq!(Some(&Putter), cu.port_info.polarities.get(&putter)); + assert_eq!(Some(&Putter), cu.inner.port_info.polarities.get(&putter)); // overwrite assignment - let var = cu.port_info.spec_var_for(putter); + let var = cu.inner.port_info.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { - log!(cu.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); + log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" - branch.did_put_or_get.insert(putter); - log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + branch.inner.did_put_or_get.insert(putter); + log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.getter_buffer.putter_add(cu, putter, msg); drainer.add_input(predicate, branch); @@ -974,7 +985,7 @@ impl BranchingProtoComponent { send_payload_msg: &SendPayloadMsg, pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>, ) -> Result<(), UnrecoverableSyncError> { - let logger = &mut *cu.logger; + let logger = &mut *cu.inner.logger; log!( logger, "feeding proto component {:?} getter {:?} {:?}", @@ -1039,7 +1050,7 @@ impl BranchingProtoComponent { )?; // swap the blocked branches back std::mem::swap(blocked.0, branches); - log!(cu.logger, "component settles down with branches: {:?}", branches.keys()); + log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys()); Ok(()) } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { @@ -1053,13 +1064,7 @@ impl BranchingProtoComponent { panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { - let branch = ProtoComponentBranch { - inbox: Default::default(), - did_put_or_get: Default::default(), - state, - ended: false, - untaken_choice: None, - }; + let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false }; Self { ports, branches: hashmap! { Predicate::default() => branch } } } } @@ -1172,44 +1177,44 @@ impl GetterBuffer { self.getters_and_sends.push((getter, msg)); } fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) { - if let Some(&getter) = cu.port_info.peers.get(&putter) { + if let Some(&getter) = cu.inner.port_info.peers.get(&putter) { self.getter_add(getter, msg); } else { - log!(cu.logger, "Putter {:?} has no known peer!", putter); + log!(cu.inner.logger, "Putter {:?} has no known peer!", putter); panic!("Putter {:?} has no known peer!"); } } } impl SyncProtoContext<'_> { pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.port_info.spec_var_for(port); + let var = self.cu_inner.port_info.spec_var_for(port); self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - self.did_put_or_get.insert(port); - self.inbox.get(&port) + self.branch_inner.did_put_or_get.insert(port); + self.branch_inner.inbox.get(&port) } pub(crate) fn take_choice(&mut self) -> Option { - self.untaken_choice.take() + self.branch_inner.untaken_choice.take() } } impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) { self.swap.insert(k, v); } - fn merge_input_with V>(&mut self, k: K, v: V, mut func: F) { - use std::collections::hash_map::Entry; - let e = self.swap.entry(k); - match e { - Entry::Vacant(ev) => { - ev.insert(v); - } - Entry::Occupied(mut eo) => { - let old = eo.get_mut(); - *old = func(v, old); - } - } - } + // fn merge_input_with V>(&mut self, k: K, v: V, mut func: F) { + // use std::collections::hash_map::Entry; + // let e = self.swap.entry(k); + // match e { + // Entry::Vacant(ev) => { + // ev.insert(v); + // } + // Entry::Occupied(mut eo) => { + // let old = eo.get_mut(); + // *old = func(v, old); + // } + // } + // } fn add_output(&mut self, k: K, v: V) { self.output.insert(k, v); } @@ -1219,7 +1224,7 @@ impl NonsyncProtoContext<'_> { // called by a PROTO COMPONENT. moves its own ports. // 1. sanity check: this component owns these ports log!( - self.logger, + self.cu_inner.logger, "Component {:?} added new component with state {:?}, moving ports {:?}", self.proto_component_id, &state, @@ -1227,29 +1232,33 @@ impl NonsyncProtoContext<'_> { ); assert!(self.proto_component_ports.is_subset(&moved_ports)); // 2. remove ports from old component & update port->route - let new_id = self.id_manager.new_proto_component_id(); + let new_id = self.cu_inner.id_manager.new_proto_component_id(); for port in moved_ports.iter() { self.proto_component_ports.remove(port); - self.port_info.routes.insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); + self.cu_inner + .port_info + .routes + .insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); } // 3. create a new component self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); } pub fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component - let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; + let [o, i] = + [self.cu_inner.id_manager.new_port_id(), self.cu_inner.id_manager.new_port_id()]; self.proto_component_ports.insert(o); self.proto_component_ports.insert(i); // {polarity, peer, route} known. {} unknown. - self.port_info.polarities.insert(o, Putter); - self.port_info.polarities.insert(i, Getter); - self.port_info.peers.insert(o, i); - self.port_info.peers.insert(i, o); + self.cu_inner.port_info.polarities.insert(o, Putter); + self.cu_inner.port_info.polarities.insert(i, Getter); + self.cu_inner.port_info.peers.insert(o, i); + self.cu_inner.port_info.peers.insert(i, o); let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id)); - self.port_info.routes.insert(o, route); - self.port_info.routes.insert(i, route); + self.cu_inner.port_info.routes.insert(o, route); + self.cu_inner.port_info.routes.insert(i, route); log!( - self.logger, + self.cu_inner.logger, "Component {:?} port pair (out->in) {:?} -> {:?}", self.proto_component_id, o, @@ -1260,7 +1269,7 @@ impl NonsyncProtoContext<'_> { } impl ProtoComponentBranch { fn feed_msg(&mut self, getter: PortId, payload: Payload) { - let was = self.inbox.insert(getter, payload); + let was = self.inner.inbox.insert(getter, payload); assert!(was.is_none()) } }