diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 779169ca54a1a4786de5439773d8066b78457ba1..080249437130290a12be7caf72c4bf8183468cb7 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -2,6 +2,16 @@ use super::*; use crate::common::*; //////////////// +#[derive(Default)] +struct GetterBuffer { + getters_and_sends: Vec<(PortId, SendPayloadMsg)>, +} +struct RoundCtx { + solution_storage: SolutionStorage, + spec_var_stream: SpecVarStream, + getter_buffer: GetterBuffer, + deadline: Option, +} struct BranchingNative { branches: HashMap, } @@ -28,6 +38,7 @@ struct BranchingProtoComponent { struct ProtoComponentBranch { inbox: HashMap, state: ComponentState, + untaken_choice: Option, ended: bool, } struct CyclicDrainer<'a, K: Eq + Hash, V> { @@ -38,14 +49,6 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> { swap: &'a mut HashMap, output: &'a mut HashMap, } -trait PayloadMsgSender { - fn putter_send( - &mut self, - cu: &mut ConnectorUnphased, - putter: PortId, - msg: SendPayloadMsg, - ) -> Result<(), SyncError>; -} trait ReplaceBoolTrue { fn replace_with_true(&mut self) -> bool; } @@ -132,6 +135,7 @@ impl Connector { ConnectorPhased::Setup { .. } => Err(SyncError::NotConnected), ConnectorPhased::Communication(comm) => { comm.round_result = Self::connected_sync(unphased, comm, timeout); + comm.round_index += 1; match &comm.round_result { Ok(None) => unreachable!(), Ok(Some(ok_result)) => Ok(ok_result.batch_index), @@ -148,7 +152,7 @@ impl Connector { timeout: Option, ) -> Result, SyncError> { use SyncError as Se; - let deadline = timeout.map(|to| Instant::now() + to); + // let deadline = timeout.map(|to| Instant::now() + to); log!( cu.logger, "~~~ SYNC called with timeout {:?}; starting round {}", @@ -156,8 +160,6 @@ impl Connector { comm.round_index ); - let mut spec_var_stream = cu.id_manager.new_spec_var_stream(); - // 1. run all proto components to Nonsync blockers let mut branching_proto_components = HashMap::::default(); @@ -207,18 +209,21 @@ impl Connector { branching_proto_components.len(), ); - // NOTE: all msgs in outbox are of form (Getter, Payload) - let mut payloads_to_get: Vec<(PortId, SendPayloadMsg)> = vec![]; - - // create the solution storage - let mut solution_storage = { - let n = std::iter::once(Route::LocalComponent(ComponentId::Native)); - let c = - cu.proto_components.keys().map(|&id| Route::LocalComponent(ComponentId::Proto(id))); - let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index }); - SolutionStorage::new(n.chain(c).chain(e)) + let mut rctx = RoundCtx { + solution_storage: { + let n = std::iter::once(Route::LocalComponent(ComponentId::Native)); + let c = cu + .proto_components + .keys() + .map(|&id| Route::LocalComponent(ComponentId::Proto(id))); + let e = comm.neighborhood.children.iter().map(|&index| Route::Endpoint { index }); + SolutionStorage::new(n.chain(c).chain(e)) + }, + spec_var_stream: cu.id_manager.new_spec_var_stream(), + getter_buffer: Default::default(), + deadline: timeout.map(|to| Instant::now() + to), }; - log!(cu.logger, "Solution storage initialized"); + log!(cu.logger, "Round context structure initialized"); // 2. kick off the native log!( @@ -226,7 +231,7 @@ impl Connector { "Translating {} native batches into branches...", comm.native_batches.len() ); - let native_branch_spec_var = spec_var_stream.next(); + let native_branch_spec_var = rctx.spec_var_stream.next(); log!(cu.logger, "Native branch spec var is {:?}", native_branch_spec_var); let mut branching_native = BranchingNative { branches: Default::default() }; 'native_branches: for ((native_branch, index), branch_spec_val) in @@ -259,7 +264,7 @@ impl Connector { for (putter, payload) in to_put { let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; log!(cu.logger, "Native branch {} sending msg {:?}", index, &msg); - payloads_to_get.putter_send(cu, putter, msg)?; + rctx.getter_buffer.putter_add(cu, putter, msg)?; } if to_get.is_empty() { log!( @@ -268,16 +273,16 @@ impl Connector { index, &predicate ); - solution_storage.submit_and_digest_subtree_solution( + rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, Route::LocalComponent(ComponentId::Native), predicate.clone(), ); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; - if let Some(_existing) = branching_native.branches.insert(predicate, branch) { + if let Some(_) = branching_native.branches.insert(predicate, branch) { + // thanks to the native_branch_spec_var, each batch has a distinct predicate unreachable!() - // return Err(Se::IndistinguishableBatches([index, existing.index])); } } // restore the invariant @@ -287,9 +292,7 @@ impl Connector { comm, &mut branching_native, &mut branching_proto_components, - solution_storage, - payloads_to_get, - deadline, + &mut rctx, )?; log!(cu.logger, "Committing to decision {:?}!", &decision); @@ -340,9 +343,7 @@ impl Connector { comm: &mut ConnectorCommunication, branching_native: &mut BranchingNative, branching_proto_components: &mut HashMap, - mut solution_storage: SolutionStorage, - mut payloads_to_get: Vec<(PortId, SendPayloadMsg)>, - mut deadline: Option, + rctx: &mut RoundCtx, ) -> Result { let mut already_requested_failure = false; if branching_native.branches.is_empty() { @@ -379,8 +380,7 @@ impl Connector { BranchingProtoComponent::drain_branches_to_blocked( cd, cu, - &mut solution_storage, - &mut payloads_to_get, + rctx, proto_component_id, ports, )?; @@ -406,8 +406,8 @@ impl Connector { comm.endpoint_manager.undelay_all(); 'undecided: loop { // drain payloads_to_get, sending them through endpoints / feeding them to components - log!(cu.logger, "Decision loop! have {} messages to recv", payloads_to_get.len()); - while let Some((getter, send_payload_msg)) = payloads_to_get.pop() { + log!(cu.logger, "Decision loop! have {} messages to recv", rctx.getter_buffer.len()); + while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { assert!(cu.port_info.polarities.get(&getter).copied() == Some(Getter)); let route = cu.port_info.routes.get(&getter); log!(cu.logger, "Routing msg {:?} to {:?}", &send_payload_msg, &route); @@ -429,7 +429,7 @@ impl Connector { } Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg( cu, - &mut solution_storage, + &mut rctx.solution_storage, getter, &send_payload_msg, ), @@ -440,9 +440,8 @@ impl Connector { let proto_component_id = *proto_component_id; branching_component.feed_msg( cu, - &mut solution_storage, + rctx, proto_component_id, - &mut payloads_to_get, getter, &send_payload_msg, )?; @@ -478,7 +477,7 @@ impl Connector { // check if we have a solution yet log!(cu.logger, "Check if we have any local decisions..."); - for solution in solution_storage.iter_new_local_make_old() { + for solution in rctx.solution_storage.iter_new_local_make_old() { log!(cu.logger, "New local decision with solution {:?}...", &solution); match comm.neighborhood.parent { Some(parent) => { @@ -502,7 +501,10 @@ impl Connector { log!(cu.logger, "No decision yet. Let's recv an endpoint msg..."); { let (endpoint_index, msg) = loop { - match comm.endpoint_manager.try_recv_any_comms(&mut *cu.logger, deadline)? { + match comm + .endpoint_manager + .try_recv_any_comms(&mut *cu.logger, rctx.deadline)? + { None => { log!(cu.logger, "Reached user-defined deadling without decision..."); if let Some(parent) = comm.neighborhood.parent { @@ -515,7 +517,7 @@ impl Connector { log!(cu.logger, "As the leader, deciding on timeout"); return Ok(Decision::Failure); } - deadline = None; + rctx.deadline = None; } Some((endpoint_index, msg)) => break (endpoint_index, msg), } @@ -562,7 +564,7 @@ impl Connector { "Msg routed to getter port {:?}. Buffer for recv loop", getter, ); - payloads_to_get.push((getter, send_payload_msg)); + rctx.getter_buffer.getter_add(getter, send_payload_msg); } CommMsgContents::Suggest { suggestion } => { // only accept this control msg through a child endpoint @@ -572,7 +574,7 @@ impl Connector { // child solution contributes to local solution log!(cu.logger, "Child provided solution {:?}", &predicate); let route = Route::Endpoint { index: endpoint_index }; - solution_storage.submit_and_digest_subtree_solution( + rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, route, predicate, @@ -753,13 +755,13 @@ impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, cu: &mut ConnectorUnphased, - solution_storage: &mut SolutionStorage, - payload_msg_sender: &mut impl PayloadMsgSender, + rctx: &mut RoundCtx, proto_component_id: ProtoComponentId, ports: &HashSet, ) -> Result<(), SyncError> { cd.cylic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { + untaken_choice: &mut branch.untaken_choice, logger: &mut *cu.logger, predicate: &predicate, port_info: &cu.port_info, @@ -775,6 +777,15 @@ impl BranchingProtoComponent { ); use SyncBlocker as B; match blocker { + B::NondetChoice { n } => { + let var = rctx.spec_var_stream.next(); + for val in SpecVal::iter_domain().take(n as usize) { + let pred = predicate.clone().inserted(var, val); + let mut branch_n = branch.clone(); + branch_n.untaken_choice = Some(val.0); + drainer.add_input(pred, branch_n); + } + } B::Inconsistent => { // branch is inconsistent. throw it away drop((predicate, branch)); @@ -786,7 +797,7 @@ impl BranchingProtoComponent { predicate.assigned.entry(var).or_insert(SpecVal::SILENT); } // submit solution for this component - solution_storage.submit_and_digest_subtree_solution( + rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.logger, Route::LocalComponent(ComponentId::Proto(proto_component_id)), predicate.clone(), @@ -822,7 +833,7 @@ impl BranchingProtoComponent { // keep in "unblocked" log!(cu.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - payload_msg_sender.putter_send(cu, putter, msg)?; + rctx.getter_buffer.putter_add(cu, putter, msg)?; drainer.add_input(predicate, branch); } } @@ -833,9 +844,8 @@ impl BranchingProtoComponent { fn feed_msg( &mut self, cu: &mut ConnectorUnphased, - solution_storage: &mut SolutionStorage, + rctx: &mut RoundCtx, proto_component_id: ProtoComponentId, - payload_msg_sender: &mut impl PayloadMsgSender, getter: PortId, send_payload_msg: &SendPayloadMsg, ) -> Result<(), SyncError> { @@ -898,8 +908,7 @@ impl BranchingProtoComponent { BranchingProtoComponent::drain_branches_to_blocked( cd, cu, - solution_storage, - payload_msg_sender, + rctx, proto_component_id, ports, )?; @@ -919,7 +928,12 @@ impl BranchingProtoComponent { panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { - let branch = ProtoComponentBranch { inbox: Default::default(), state, ended: false }; + let branch = ProtoComponentBranch { + inbox: Default::default(), + state, + ended: false, + untaken_choice: None, + }; Self { ports, branches: hashmap! { Predicate::default() => branch } } } } @@ -960,9 +974,6 @@ impl SolutionStorage { self.subtree_solutions.push(Default::default()) } } - // pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { - // self.new_local.iter() - // } pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { let Self { old_local, new_local, .. } = self; new_local.drain().map(move |local| { @@ -1024,15 +1035,24 @@ impl SolutionStorage { } } } -impl PayloadMsgSender for Vec<(PortId, SendPayloadMsg)> { - fn putter_send( +impl GetterBuffer { + fn len(&self) -> usize { + self.getters_and_sends.len() + } + fn pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { + self.getters_and_sends.pop() + } + fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { + self.getters_and_sends.push((getter, msg)); + } + fn putter_add( &mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg, ) -> Result<(), SyncError> { if let Some(&getter) = cu.port_info.peers.get(&putter) { - self.push((getter, msg)); + self.getter_add(getter, msg); Ok(()) } else { Err(SyncError::MalformedStateError(MalformedStateError::GetterUnknownFor { putter })) @@ -1047,6 +1067,9 @@ impl SyncProtoContext<'_> { pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { self.inbox.get(&port) } + pub(crate) fn take_choice(&mut self) -> Option { + self.untaken_choice.take() + } } impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) {