diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 3f693584689850f084c091987960dc22a891a1b5..06e451f7436615250460f56cd70db10e6bd25d04 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -8,16 +8,6 @@ struct MapTempsGuard<'a, K, V>(&'a mut [HashMap]); // Type protecting a temporary map; At the start and end of the Guard's lifetime, self.0.is_empty() must be true struct MapTempGuard<'a, K, V>(&'a mut HashMap); -#[derive(Default)] -struct GetterBuffer { - getters_and_sends: Vec<(PortId, SendPayloadMsg)>, -} -struct RoundCtx { - solution_storage: SolutionStorage, - spec_var_stream: SpecVarStream, - getter_buffer: GetterBuffer, - deadline: Option, -} struct BranchingNative { branches: HashMap, } @@ -28,14 +18,6 @@ struct NativeBranch { to_get: HashSet, } #[derive(Debug)] -struct SolutionStorage { - old_local: HashSet, - new_local: HashSet, - // this pair acts as SubtreeId -> HashSet which is friendlier to iteration - subtree_solutions: Vec>, - subtree_id_to_index: HashMap, -} -#[derive(Debug)] struct BranchingProtoComponent { branches: HashMap, } @@ -63,6 +45,17 @@ impl ReplaceBoolTrue for bool { !was } } +impl CuUndecided for ConnectorUnphased { + fn logger(&mut self) -> &mut dyn Logger { + &mut *self.inner.logger + } + fn proto_description(&self) -> &ProtocolDescription { + &self.proto_description + } + fn native_component_id(&self) -> ComponentId { + self.inner.native_component_id + } +} //////////////// impl<'a, K, V> MapTempsGuard<'a, K, V> { @@ -96,14 +89,6 @@ impl<'a, K, V> DerefMut for MapTempGuard<'a, K, V> { self.0 } } -impl RoundCtxTrait for RoundCtx { - fn get_deadline(&self) -> &Option { - &self.deadline - } - fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { - self.getter_buffer.getter_add(getter, msg) - } -} impl Connector { fn get_comm_mut(&mut self) -> Option<&mut ConnectorCommunication> { if let ConnectorPhased::Communication(comm) = &mut self.phased { @@ -213,6 +198,7 @@ impl Connector { // 1. run all proto components to Nonsync blockers // iterate + let current_state = cu.inner.current_state.clone(); let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu @@ -260,6 +246,7 @@ impl Connector { // Create temp structures needed for the synchronous phase of the round let mut rctx = RoundCtx { + current_state, solution_storage: { let n = std::iter::once(SubtreeId::LocalComponent(cu.inner.native_component_id)); let c = cu.proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid)); @@ -277,7 +264,7 @@ impl Connector { SolutionStorage::new(subtree_id_iter) }, spec_var_stream: cu.inner.current_state.id_manager.new_spec_var_stream(), - getter_buffer: Default::default(), + payload_inbox: Default::default(), deadline: timeout.map(|to| Instant::now() + to), }; log!(cu.inner.logger, "Round context structure initialized"); @@ -348,7 +335,7 @@ impl Connector { ); // sanity check assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity); - rctx.getter_buffer.putter_add(cu, putter, msg); + rctx.putter_push(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; if branch.is_ended() { @@ -359,7 +346,7 @@ impl Connector { &predicate ); rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.inner.logger, + cu, SubtreeId::LocalComponent(cu.inner.native_component_id), predicate.clone(), ); @@ -417,6 +404,8 @@ impl Connector { .into_iter() .map(|(id, bpc)| (id, bpc.collapse_with(&predicate))), ); + // commit changes to ports and id_manager + cu.inner.current_state = rctx.current_state; log!( cu.inner.logger, "End round with (updated) component states {:?}", @@ -432,26 +421,26 @@ impl Connector { } fn sync_reach_decision( - cu: &mut ConnectorUnphased, + cu: &mut impl CuUndecided, comm: &mut ConnectorCommunication, branching_native: &mut BranchingNative, branching_proto_components: &mut HashMap, rctx: &mut RoundCtx, ) -> Result { - log!(@MARK, cu.inner.logger, "decide start"); + log!(@MARK, cu.logger(), "decide start"); let mut already_requested_failure = false; if branching_native.branches.is_empty() { - log!(cu.inner.logger, "Native starts with no branches! Failure!"); + log!(cu.logger(), "Native starts with no branches! Failure!"); match comm.neighborhood.parent { Some(parent) => { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.inner.logger, "Already requested failure"); + log!(cu.logger(), "Already requested failure"); } } None => { - log!(cu.inner.logger, "No parent. Deciding on failure"); + log!(cu.logger(), "No parent. Deciding on failure"); return Ok(Decision::Failure); } } @@ -462,7 +451,7 @@ impl Connector { // run all proto components to their sync blocker log!( - cu.inner.logger, + cu.logger(), "Running all {} proto components to their sync blocker...", branching_proto_components.len() ); @@ -478,37 +467,33 @@ impl Connector { // swap the blocked branches back std::mem::swap(blocked.0, branches); if branches.is_empty() { - log!(cu.inner.logger, "{:?} has become inconsistent!", proto_component_id); + log!(cu.logger(), "{:?} has become inconsistent!", proto_component_id); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.inner.logger, "Already requested failure"); + log!(cu.logger(), "Already requested failure"); } } else { - log!(cu.inner.logger, "As the leader, deciding on timeout"); + log!(cu.logger(), "As the leader, deciding on timeout"); return Ok(Decision::Failure); } } } - log!(cu.inner.logger, "All proto components are blocked"); + log!(cu.logger(), "All proto components are blocked"); - log!(cu.inner.logger, "Entering decision loop..."); + log!(cu.logger(), "Entering decision loop..."); comm.endpoint_manager.undelay_all(); 'undecided: loop { // drain payloads_to_get, sending them through endpoints / feeding them to components - log!( - cu.inner.logger, - "Decision loop! have {} messages to recv", - rctx.getter_buffer.len() - ); - while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { - log!(@MARK, cu.inner.logger, "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg); - let getter_info = cu.inner.current_state.port_info.get(&getter).unwrap(); + log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len()); + while let Some((getter, send_payload_msg)) = rctx.getter_pop() { + log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg); + let getter_info = rctx.current_state.port_info.get(&getter).unwrap(); let cid = getter_info.owner; assert_eq!(Getter, getter_info.polarity); log!( - cu.inner.logger, + cu.logger(), "Routing msg {:?} to {:?} via {:?}", &send_payload_msg, getter, @@ -519,26 +504,25 @@ impl Connector { let udp_endpoint_ext = &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[index]; let SendPayloadMsg { predicate, payload } = send_payload_msg; - log!(cu.inner.logger, "Delivering to udp endpoint index={}", index); + log!(cu.logger(), "Delivering to udp endpoint index={}", index); udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } Route::NetEndpoint { index } => { - log!(@MARK, cu.inner.logger, "sending payload"); + log!(@MARK, cu.logger(), "sending payload"); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::SendPayload(send_payload_msg), }); comm.endpoint_manager.send_to_comms(index, &msg)?; } - Route::LocalComponent if cid == cu.inner.native_component_id => { - branching_native.feed_msg( + Route::LocalComponent if cid == cu.native_component_id() => branching_native + .feed_msg( cu, rctx, getter, &send_payload_msg, MapTempGuard::new(&mut bn_temp_owner), - ) - } + ), Route::LocalComponent => { if let Some(branching_component) = branching_proto_components.get_mut(&cid) { @@ -551,21 +535,21 @@ impl Connector { pcb_temps.reborrow(), )?; if branching_component.branches.is_empty() { - log!(cu.inner.logger, "{:?} has become inconsistent!", cid); + log!(cu.logger(), "{:?} has become inconsistent!", cid); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.inner.logger, "Already requested failure"); + log!(cu.logger(), "Already requested failure"); } } else { - log!(cu.inner.logger, "As the leader, deciding on timeout"); + log!(cu.logger(), "As the leader, deciding on timeout"); return Ok(Decision::Failure); } } } else { log!( - cu.inner.logger, + cu.logger(), "Delivery to getter {:?} msg {:?} failed because {:?} isn't here", getter, &send_payload_msg, @@ -577,13 +561,13 @@ impl Connector { } // check if we have a solution yet - log!(cu.inner.logger, "Check if we have any local decisions..."); + log!(cu.logger(), "Check if we have any local decisions..."); for solution in rctx.solution_storage.iter_new_local_make_old() { - log!(cu.inner.logger, "New local decision with solution {:?}...", &solution); - log!(@MARK, cu.inner.logger, "local solution"); + log!(cu.logger(), "New local decision with solution {:?}...", &solution); + log!(@MARK, cu.logger(), "local solution"); match comm.neighborhood.parent { Some(parent) => { - log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); + log!(cu.logger(), "Forwarding to my parent {:?}", parent); let suggestion = Decision::Success(solution); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, @@ -594,7 +578,7 @@ impl Connector { comm.endpoint_manager.send_to_comms(parent, &msg)?; } None => { - log!(cu.inner.logger, "No parent. Deciding on solution {:?}", &solution); + log!(cu.logger(), "No parent. Deciding on solution {:?}", &solution); return Ok(Decision::Success(solution)); } } @@ -602,36 +586,30 @@ impl Connector { // stuck! make progress by receiving a msg // try recv messages arriving through endpoints - log!(cu.inner.logger, "No decision yet. Let's recv an endpoint msg..."); + log!(cu.logger(), "No decision yet. Let's recv an endpoint msg..."); { - let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = match comm - .endpoint_manager - .try_recv_any_comms( - &mut *cu.inner.logger, - &cu.inner.current_state, - rctx, - comm.round_index, - )? { - CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), - CommRecvOk::NewPayloadMsgs => continue 'undecided, - CommRecvOk::TimeoutWithoutNew => { - log!(cu.inner.logger, "Reached user-defined deadling without decision..."); - if let Some(parent) = comm.neighborhood.parent { - if already_requested_failure.replace_with_true() { - Self::request_failure(cu, comm, parent)? + let (net_index, comm_ctrl_msg): (usize, CommCtrlMsg) = + match comm.endpoint_manager.try_recv_any_comms(cu, rctx, comm.round_index)? { + CommRecvOk::NewControlMsg { net_index, msg } => (net_index, msg), + CommRecvOk::NewPayloadMsgs => continue 'undecided, + CommRecvOk::TimeoutWithoutNew => { + log!(cu.logger(), "Reached user-defined deadling without decision..."); + if let Some(parent) = comm.neighborhood.parent { + if already_requested_failure.replace_with_true() { + Self::request_failure(cu, comm, parent)? + } else { + log!(cu.logger(), "Already requested failure"); + } } else { - log!(cu.inner.logger, "Already requested failure"); + log!(cu.logger(), "As the leader, deciding on timeout"); + return Ok(Decision::Failure); } - } else { - log!(cu.inner.logger, "As the leader, deciding on timeout"); - return Ok(Decision::Failure); + rctx.deadline = None; + continue 'undecided; } - rctx.deadline = None; - continue 'undecided; - } - }; + }; log!( - cu.inner.logger, + cu.logger(), "Received from endpoint {} ctrl msg {:?}", net_index, &comm_ctrl_msg @@ -643,30 +621,24 @@ impl Connector { match suggestion { Decision::Success(predicate) => { // child solution contributes to local solution - log!( - cu.inner.logger, - "Child provided solution {:?}", - &predicate - ); + log!(cu.logger(), "Child provided solution {:?}", &predicate); let subtree_id = SubtreeId::NetEndpoint { index: net_index }; rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.inner.logger, - subtree_id, - predicate, + cu, subtree_id, predicate, ); } Decision::Failure => { match comm.neighborhood.parent { None => { - log!(cu.inner.logger, "I decide on my child's failure"); + log!(cu.logger(), "I decide on my child's failure"); break 'undecided Ok(Decision::Failure); } Some(parent) => { - log!(cu.inner.logger, "Forwarding failure through my parent endpoint {:?}", parent); + log!(cu.logger(), "Forwarding failure through my parent endpoint {:?}", parent); if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? } else { - log!(cu.inner.logger, "Already requested failure"); + log!(cu.logger(), "Already requested failure"); } } } @@ -674,7 +646,7 @@ impl Connector { } } else { log!( - cu.inner.logger, + cu.logger(), "Discarding suggestion {:?} from non-child endpoint idx {:?}", &suggestion, net_index @@ -687,7 +659,7 @@ impl Connector { return Ok(decision); } else { log!( - cu.inner.logger, + cu.logger(), "Discarding announcement {:?} from non-parent endpoint idx {:?}", &decision, net_index @@ -696,15 +668,15 @@ impl Connector { } } } - log!(cu.inner.logger, "Endpoint msg recv done"); + log!(cu.logger(), "Endpoint msg recv done"); } } fn request_failure( - cu: &mut ConnectorUnphased, + cu: &mut impl CuUndecided, comm: &mut ConnectorCommunication, parent: usize, ) -> Result<(), UnrecoverableSyncError> { - log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); + log!(cu.logger(), "Forwarding to my parent {:?}", parent); let suggestion = Decision::Failure; let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, @@ -721,41 +693,41 @@ impl NativeBranch { impl BranchingNative { fn feed_msg( &mut self, - cu: &mut ConnectorUnphased, - round_ctx: &mut RoundCtx, + cu: &mut impl CuUndecided, + rctx: &mut RoundCtx, getter: PortId, send_payload_msg: &SendPayloadMsg, bn_temp: MapTempGuard<'_, Predicate, NativeBranch>, ) { - log!(cu.inner.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert_eq!(Getter, cu.inner.current_state.port_info.get(&getter).unwrap().polarity); + log!(cu.logger(), "feeding native getter {:?} {:?}", getter, &send_payload_msg); + assert_eq!(Getter, rctx.current_state.port_info.get(&getter).unwrap().polarity); let mut draining = bn_temp; let finished = &mut self.branches; std::mem::swap(draining.0, finished); for (predicate, mut branch) in draining.drain() { - log!(cu.inner.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); + log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it - let var = cu.inner.current_state.spec_var_for(getter); + let var = rctx.current_state.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { branch.to_get.remove(&getter); let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); assert!(was.is_none()); if branch.is_ended() { log!( - cu.inner.logger, + cu.logger(), "new native solution with {:?} is_ended() with gotten {:?}", &predicate, &branch.gotten ); - let subtree_id = SubtreeId::LocalComponent(cu.inner.native_component_id); - round_ctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.inner.logger, + let subtree_id = SubtreeId::LocalComponent(cu.native_component_id()); + rctx.solution_storage.submit_and_digest_subtree_solution( + cu, subtree_id, predicate.clone(), ); } else { log!( - cu.inner.logger, + cu.logger(), "Fed native {:?} still has to_get {:?}", &predicate, &branch.to_get @@ -765,7 +737,7 @@ impl BranchingNative { if predicate.query(var) != Some(SpecVal::FIRING) { // optimization. Don't bother trying this branch log!( - cu.inner.logger, + cu.logger(), "skipping branch with {:?} that doesn't want the message (fastpath)", &predicate ); @@ -777,7 +749,7 @@ impl BranchingNative { Aur::Nonexistant => { // this branch does not receive the message log!( - cu.inner.logger, + cu.logger(), "skipping branch with {:?} that doesn't want the message (slowpath)", &predicate ); @@ -786,7 +758,7 @@ impl BranchingNative { Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload feed_branch(&mut branch, &predicate); - log!(cu.inner.logger, "branch pred covers it! Accept the msg"); + log!(cu.logger(), "branch pred covers it! Accept the msg"); Self::insert_branch_merging(finished, predicate, branch); } Aur::LatterNotFormer => { @@ -795,7 +767,7 @@ impl BranchingNative { let predicate2 = send_payload_msg.predicate.clone(); feed_branch(&mut branch2, &predicate2); log!( - cu.inner.logger, + cu.logger(), "payload pred {:?} covers branch pred {:?}", &predicate2, &predicate @@ -808,7 +780,7 @@ impl BranchingNative { let mut branch2 = branch.clone(); feed_branch(&mut branch2, &predicate2); log!( - cu.inner.logger, + cu.logger(), "new subsuming pred created {:?}. forking and feeding", &predicate2 ); @@ -871,19 +843,19 @@ impl BranchingNative { impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, - cu: &mut ConnectorUnphased, + cu: &mut impl CuUndecided, rctx: &mut RoundCtx, proto_component_id: ComponentId, ) -> Result<(), UnrecoverableSyncError> { cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { - cu_inner: &mut cu.inner, + rctx, predicate: &predicate, branch_inner: &mut branch.inner, }; - let blocker = branch.state.sync_run(&mut ctx, &cu.proto_description); + let blocker = branch.state.sync_run(&mut ctx, cu.proto_description()); log!( - cu.inner.logger, + cu.logger(), "Proto component with id {:?} branch with pred {:?} hit blocker {:?}", proto_component_id, &predicate, @@ -908,7 +880,7 @@ impl BranchingProtoComponent { } B::CouldntCheckFiring(port) => { // sanity check - let var = cu.inner.current_state.spec_var_for(port); + let var = rctx.current_state.spec_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); @@ -916,37 +888,37 @@ impl BranchingProtoComponent { } B::PutMsg(putter, payload) => { // sanity check - assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity); + assert_eq!(Putter, rctx.current_state.port_info.get(&putter).unwrap().polarity); // overwrite assignment - let var = cu.inner.current_state.spec_var_for(putter); + let var = rctx.current_state.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { - log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", + log!(cu.logger(), "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" branch.inner.did_put_or_get.insert(putter); - log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", + log!(cu.logger(), "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; - rctx.getter_buffer.putter_add(cu, putter, msg); + rctx.putter_push(cu, putter, msg); drainer.add_input(predicate, branch); } } B::SyncBlockEnd => { // make concrete all variables - for (port, port_info) in cu.inner.current_state.port_info.iter() { + for (port, port_info) in rctx.current_state.port_info.iter() { if port_info.owner != proto_component_id { continue; } - let var = cu.inner.current_state.spec_var_for(*port); + let var = rctx.current_state.spec_var_for(*port); let should_have_fired = branch.inner.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let did_fire = val == SpecVal::FIRING; if did_fire != should_have_fired { - log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", + log!(cu.logger(), "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); // IMPLICIT inconsistency drop((predicate, branch)); @@ -956,7 +928,7 @@ impl BranchingProtoComponent { // submit solution for this component let subtree_id = SubtreeId::LocalComponent(proto_component_id); rctx.solution_storage.submit_and_digest_subtree_solution( - &mut *cu.inner.logger, + cu, subtree_id, predicate.clone(), ); @@ -970,16 +942,15 @@ impl BranchingProtoComponent { } fn feed_msg( &mut self, - cu: &mut ConnectorUnphased, + cu: &mut impl CuUndecided, rctx: &mut RoundCtx, proto_component_id: ComponentId, getter: PortId, send_payload_msg: &SendPayloadMsg, pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>, ) -> Result<(), UnrecoverableSyncError> { - let logger = &mut *cu.inner.logger; log!( - logger, + cu.logger(), "feeding proto component {:?} getter {:?} {:?}", proto_component_id, getter, @@ -989,30 +960,30 @@ impl BranchingProtoComponent { let (mut unblocked, pcb_temps) = pcb_temps.split_first_mut(); let (mut blocked, pcb_temps) = pcb_temps.split_first_mut(); // partition drain from branches -> {unblocked, blocked} - log!(logger, "visiting {} blocked branches...", branches.len()); + log!(cu.logger(), "visiting {} blocked branches...", branches.len()); for (predicate, mut branch) in branches.drain() { if branch.ended { - log!(logger, "Skipping ended branch with {:?}", &predicate); + log!(cu.logger(), "Skipping ended branch with {:?}", &predicate); Self::insert_branch_merging(&mut blocked, predicate, branch); continue; } use AssignmentUnionResult as Aur; - log!(logger, "visiting branch with pred {:?}", &predicate); + log!(cu.logger(), "visiting branch with pred {:?}", &predicate); match predicate.assignment_union(&send_payload_msg.predicate) { Aur::Nonexistant => { // this branch does not receive the message - log!(logger, "skipping branch"); + log!(cu.logger(), "skipping branch"); Self::insert_branch_merging(&mut blocked, predicate, branch); } Aur::Equivalent | Aur::FormerNotLatter => { // retain the existing predicate, but add this payload - log!(logger, "feeding this branch without altering its predicate"); + log!(cu.logger(), "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); Self::insert_branch_merging(&mut unblocked, predicate, branch); } Aur::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched - log!(logger, "Forking this branch, giving it the predicate of the msg"); + log!(cu.logger(), "Forking this branch, giving it the predicate of the msg"); let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); @@ -1021,7 +992,7 @@ impl BranchingProtoComponent { } Aur::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched - log!(logger, "Forking this branch with new predicate {:?}", &predicate2); + log!(cu.logger(), "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); Self::insert_branch_merging(&mut blocked, predicate, branch); @@ -1029,14 +1000,14 @@ impl BranchingProtoComponent { } } } - log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len()); + log!(cu.logger(), "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len()); // drain from unblocked --> blocked let (swap, _pcb_temps) = pcb_temps.split_first_mut(); let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0); BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; // swap the blocked branches back std::mem::swap(blocked.0, branches); - log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys()); + log!(cu.logger(), "component settles down with branches: {:?}", branches.keys()); Ok(()) } fn insert_branch_merging( @@ -1123,11 +1094,11 @@ impl SolutionStorage { } pub(crate) fn submit_and_digest_subtree_solution( &mut self, - logger: &mut dyn Logger, + cu: &mut impl CuUndecided, subtree_id: SubtreeId, predicate: Predicate, ) { - log!(logger, "++ new component solution {:?} {:?}", subtree_id, &predicate); + log!(cu.logger(), "++ new component solution {:?} {:?}", subtree_id, &predicate); let index = self.subtree_id_to_index[&subtree_id]; let left = 0..index; let right = (index + 1)..self.subtree_solutions.len(); @@ -1136,17 +1107,11 @@ impl SolutionStorage { let was_new = subtree_solutions[index].insert(predicate.clone()); if was_new { let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); - Self::elaborate_into_new_local_rec( - logger, - predicate, - set_visitor, - old_local, - new_local, - ); + Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local); } } fn elaborate_into_new_local_rec<'a, 'b>( - logger: &mut dyn Logger, + cu: &mut impl CuUndecided, partial: Predicate, mut set_visitor: impl Iterator> + Clone, old_local: &'b HashSet, @@ -1157,7 +1122,7 @@ impl SolutionStorage { for pred in set.iter() { if let Some(elaborated) = pred.union_with(&partial) { Self::elaborate_into_new_local_rec( - logger, + cu, elaborated, set_visitor.clone(), old_local, @@ -1169,35 +1134,15 @@ impl SolutionStorage { // recursive stop condition. `partial` is a local subtree solution if !old_local.contains(&partial) { // ... and it hasn't been found before - log!(logger, "storing NEW LOCAL SOLUTION {:?}", &partial); + log!(cu.logger(), "storing NEW LOCAL SOLUTION {:?}", &partial); new_local.insert(partial); } } } } -impl GetterBuffer { - fn len(&self) -> usize { - self.getters_and_sends.len() - } - fn pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { - self.getters_and_sends.pop() - } - fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg) { - self.getters_and_sends.push((getter, msg)); - } - fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) { - if let Some(getter) = cu.inner.current_state.port_info.get(&putter).unwrap().peer { - log!(cu.inner.logger, "Putter add (putter:{:?} => getter:{:?})", putter, getter); - self.getter_add(getter, msg); - } else { - log!(cu.inner.logger, "Putter {:?} has no known peer!", putter); - panic!("Putter {:?} has no known peer!"); - } - } -} impl SyncProtoContext<'_> { pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.cu_inner.current_state.spec_var_for(port); + let var = self.rctx.current_state.spec_var_for(port); self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { @@ -1305,3 +1250,46 @@ impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { Ok(()) } } + +// struct ConnectorComm { +// logger: Box, +// pd: Arc, +// current_state: CurrentState, +// components: HashMap, +// ports: HashMap, +// endpoint_manager: EndpointManager, +// neighborhood: Neighborhood, +// native_batches: Vec, +// round_result: Result, SyncError>, +// } + +// struct RoundTemp<'a> { +// deadline: Option, +// msg_buf: Vec<(PortId, SendPayloadMsg)>, +// solution_storage: SolutionStorage, +// override_ports: HashMap, +// spec_var_stream: SpecVarStream, +// branching_proto: HashMap, +// branching_native: BranchingNative, +// comm: &'a mut ConnectorComm, +// } +// impl ConnectorComm { +// fn sync(&mut self, deadline: Option) -> Result { +// RoundTemp { +// msg_buf: Default::default(), +// deadline: todo!(), +// solution_storage: todo!(), +// override_ports: Default::default(), +// spec_var_stream: todo!(), +// branching_proto: Default::default(), +// branching_native: todo!(), +// comm: self, +// } +// .sync() +// } +// } +// impl RoundTemp<'_> { +// fn sync(&mut self) -> Result { +// todo!() +// } +// } diff --git a/src/runtime/endpoints.rs b/src/runtime/endpoints.rs index 054123fb03ba031c5ad1b8541323b83139e89c29..62dbd2cc59e0a488b456401011ed0130ca21342a 100644 --- a/src/runtime/endpoints.rs +++ b/src/runtime/endpoints.rs @@ -146,22 +146,21 @@ impl EndpointManager { // drops all Setup messages, // buffers all future round messages, // drops all previous round messages, - // enqueues all current round SendPayload messages using round_ctx.getter_add + // enqueues all current round SendPayload messages using rctx.getter_push // returns the first comm_ctrl_msg encountered // only polls until SOME message is enqueued pub(super) fn try_recv_any_comms( &mut self, - logger: &mut dyn Logger, - current_state: &CurrentState, - round_ctx: &mut impl RoundCtxTrait, + cu: &mut impl CuUndecided, + rctx: &mut RoundCtx, round_index: usize, ) -> Result { /////////////////////////////////////////// impl EndpointManager { fn handle_msg( &mut self, - logger: &mut dyn Logger, - round_ctx: &mut impl RoundCtxTrait, + cu: &mut impl CuUndecided, + rctx: &mut RoundCtx, net_index: usize, msg: Msg, round_index: usize, @@ -173,7 +172,7 @@ impl EndpointManager { Ordering::Equal => comm_msg.contents, Ordering::Less => { log!( - logger, + cu.logger(), "We are in round {}, but msg is for round {}. Discard", comm_msg.round_index, round_index, @@ -182,7 +181,7 @@ impl EndpointManager { } Ordering::Greater => { log!( - logger, + cu.logger(), "We are in round {}, but msg is for round {}. Buffer", comm_msg.round_index, round_index, @@ -197,7 +196,7 @@ impl EndpointManager { CommMsgContents::SendPayload(send_payload_msg) => { let getter = self.net_endpoint_store.endpoint_exts[net_index].getter_for_incoming; - round_ctx.getter_add(getter, send_payload_msg); + rctx.getter_push(getter, send_payload_msg); *some_message_enqueued = true; None } @@ -209,23 +208,18 @@ impl EndpointManager { let mut some_message_enqueued = false; // try yield undelayed net message while let Some((net_index, msg)) = self.undelayed_messages.pop() { - if let Some((net_index, msg)) = self.handle_msg( - logger, - round_ctx, - net_index, - msg, - round_index, - &mut some_message_enqueued, - ) { + if let Some((net_index, msg)) = + self.handle_msg(cu, rctx, net_index, msg, round_index, &mut some_message_enqueued) + { return Ok(CommRecvOk::NewControlMsg { net_index, msg }); } } loop { // try receive a net message - while let Some((net_index, msg)) = self.try_recv_undrained_net(logger)? { + while let Some((net_index, msg)) = self.try_recv_undrained_net(cu.logger())? { if let Some((net_index, msg)) = self.handle_msg( - logger, - round_ctx, + cu, + rctx, net_index, msg, round_index, @@ -243,9 +237,9 @@ impl EndpointManager { self.udp_endpoint_store.polled_undrained.insert(index); if !ee.received_this_round { let payload = Payload::from(&recv_buffer[..bytes_written]); - let port_spec_var = current_state.spec_var_for(ee.getter_for_incoming); + let port_spec_var = rctx.current_state.spec_var_for(ee.getter_for_incoming); let predicate = Predicate::singleton(port_spec_var, SpecVal::FIRING); - round_ctx.getter_add( + rctx.getter_push( ee.getter_for_incoming, SendPayloadMsg { payload, predicate }, ); @@ -260,7 +254,7 @@ impl EndpointManager { return Ok(CommRecvOk::NewPayloadMsgs); } // poll if time remains - match self.poll_and_populate(logger, round_ctx.get_deadline()) { + match self.poll_and_populate(cu.logger(), &rctx.deadline) { Ok(()) => {} // continue looping Err(Pape::Timeout) => return Ok(CommRecvOk::TimeoutWithoutNew), Err(Pape::PollFailed) => return Err(Use::PollFailed), diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 4dfb9af6d3a5b20a47842fefa838eac1b1f8679d..e83c3598d94397628c62c52f4bade45a3ee44ee1 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -29,7 +29,7 @@ pub struct VecLogger(ConnectorId, Vec); pub struct DummyLogger; #[derive(Debug)] pub struct FileLogger(ConnectorId, std::fs::File); -#[derive(Debug)] +#[derive(Debug, Clone)] struct CurrentState { port_info: HashMap, id_manager: IdManager, @@ -40,9 +40,10 @@ pub(crate) struct NonsyncProtoContext<'a> { proto_component_id: ComponentId, // KEY in id->component map } pub(crate) struct SyncProtoContext<'a> { - cu_inner: &'a mut ConnectorUnphasedInner, // persists between rounds + rctx: &'a RoundCtx, + // cu: &'a mut dyn CuUndecided, branch_inner: &'a mut ProtoComponentBranchInner, // sub-structure of component branch - predicate: &'a Predicate, // KEY in pred->branch map + predicate: &'a Predicate, // KEY in pred->branch map } #[derive(Default, Debug, Clone)] struct ProtoComponentBranchInner { @@ -175,7 +176,7 @@ struct Neighborhood { parent: Option, children: VecSet, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct IdManager { connector_id: ConnectorId, port_suffix_stream: U32Stream, @@ -249,6 +250,26 @@ enum ConnectorPhased { struct Predicate { assigned: BTreeMap, } +#[derive(Debug)] +struct SolutionStorage { + old_local: HashSet, + new_local: HashSet, + // this pair acts as SubtreeId -> HashSet which is friendlier to iteration + subtree_solutions: Vec>, + subtree_id_to_index: HashMap, +} +struct RoundCtx { + solution_storage: SolutionStorage, + spec_var_stream: SpecVarStream, + payload_inbox: Vec<(PortId, SendPayloadMsg)>, + deadline: Option, + current_state: CurrentState, +} +trait CuUndecided { + fn logger(&mut self) -> &mut dyn Logger; + fn proto_description(&self) -> &ProtocolDescription; + fn native_component_id(&self) -> ComponentId; +} #[derive(Debug, Default)] struct NativeBatch { // invariant: putters' and getters' polarities respected @@ -261,10 +282,6 @@ enum TokenTarget { UdpEndpoint { index: usize }, Waker, } -trait RoundCtxTrait { - fn get_deadline(&self) -> &Option; - fn getter_add(&mut self, getter: PortId, msg: SendPayloadMsg); -} enum CommRecvOk { TimeoutWithoutNew, NewPayloadMsgs, @@ -653,3 +670,21 @@ impl Debug for UdpInBuffer { write!(f, "UdpInBuffer") } } + +impl RoundCtx { + fn getter_pop(&mut self) -> Option<(PortId, SendPayloadMsg)> { + self.payload_inbox.pop() + } + fn getter_push(&mut self, getter: PortId, msg: SendPayloadMsg) { + self.payload_inbox.push((getter, msg)); + } + fn putter_push(&mut self, cu: &mut impl CuUndecided, putter: PortId, msg: SendPayloadMsg) { + if let Some(getter) = self.current_state.port_info.get(&putter).unwrap().peer { + log!(cu.logger(), "Putter add (putter:{:?} => getter:{:?})", putter, getter); + self.getter_push(getter, msg); + } else { + log!(cu.logger(), "Putter {:?} has no known peer!", putter); + panic!("Putter {:?} has no known peer!"); + } + } +}