diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 09c7fb9545a9b9275a89efef775b0cb6370d58de..22834bbf2cc17c68fe44c83e4de4b5ccd0f0a8c2 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -203,6 +203,8 @@ impl Connector { ////////////////////////////////// use SyncError as Se; ////////////////////////////////// + + log!(@MARK, cu.inner.logger, "sync start {}", comm.round_index); log!( cu.inner.logger, "~~~ SYNC called with timeout {:?}; starting round {}", @@ -212,15 +214,17 @@ impl Connector { log!(@BENCH, cu.inner.logger, ""); // 1. run all proto components to Nonsync blockers - // NOTE: original components are immutable until Decision::Success + // iterate let mut branching_proto_components = - HashMap::::default(); - let mut unrun_components: Vec<(ProtoComponentId, ProtoComponent)> = - cu.proto_components.iter().map(|(&k, v)| (k, v.clone())).collect(); + HashMap::::default(); + let mut unrun_components: Vec<(ComponentId, ProtoComponent)> = cu + .proto_components + .iter() + .map(|(&proto_id, proto)| (proto_id, proto.clone())) + .collect(); log!(cu.inner.logger, "Nonsync running {} proto components...", unrun_components.len()); // drains unrun_components, and populates branching_proto_components. while let Some((proto_component_id, mut component)) = unrun_components.pop() { - // TODO coalesce fields log!( cu.inner.logger, "Nonsync running proto component with ID {:?}. {} to go after this", @@ -264,11 +268,8 @@ impl Connector { // Create temp structures needed for the synchronous phase of the round let mut rctx = RoundCtx { solution_storage: { - let n = std::iter::once(SubtreeId::LocalComponent(ComponentId::Native)); - let c = cu - .proto_components - .keys() - .map(|&id| SubtreeId::LocalComponent(ComponentId::Proto(id))); + let n = std::iter::once(SubtreeId::LocalComponent(cu.inner.native_component_id)); + let c = cu.proto_components.keys().map(|&cid| SubtreeId::LocalComponent(cid)); let e = comm .neighborhood .children @@ -350,7 +351,7 @@ impl Connector { ); rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, - SubtreeId::LocalComponent(ComponentId::Native), + SubtreeId::LocalComponent(cu.inner.native_component_id), predicate.clone(), ); } @@ -371,6 +372,7 @@ impl Connector { &mut branching_proto_components, &mut rctx, )?; + log!(@MARK, cu.inner.logger, "got decision!"); log!(cu.inner.logger, "Committing to decision {:?}!", &decision); log!(@BENCH, cu.inner.logger, ""); comm.endpoint_manager.udp_endpoints_round_end(&mut *cu.inner.logger, &decision)?; @@ -388,6 +390,7 @@ impl Connector { &msg, &comm.neighborhood.children ); + log!(@MARK, cu.inner.logger, "forwarding decision!"); for &child in comm.neighborhood.children.iter() { comm.endpoint_manager.send_to_comms(child, &msg)?; } @@ -423,9 +426,10 @@ impl Connector { cu: &mut ConnectorUnphased, comm: &mut ConnectorCommunication, branching_native: &mut BranchingNative, - branching_proto_components: &mut HashMap, + branching_proto_components: &mut HashMap, rctx: &mut RoundCtx, ) -> Result { + log!(@MARK, cu.inner.logger, "decide start"); let mut already_requested_failure = false; if branching_native.branches.is_empty() { log!(cu.inner.logger, "Native starts with no branches! Failure!"); @@ -443,8 +447,6 @@ impl Connector { } } } - log!(cu.inner.logger, "Done translating native batches into branches"); - let mut pcb_temps_owner = <[HashMap; 3]>::default(); let mut pcb_temps = MapTempsGuard(&mut pcb_temps_owner); let mut bn_temp_owner = >::default(); @@ -498,6 +500,7 @@ impl Connector { rctx.getter_buffer.len() ); while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { + log!(@MARK, cu.inner.logger, "handling payload msg"); assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); let route = cu.inner.port_info.routes.get(&getter); log!( @@ -517,38 +520,35 @@ impl Connector { udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } Some(Route::NetEndpoint { index }) => { + log!(@MARK, cu.inner.logger, "sending payload"); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::SendPayload(send_payload_msg), }); comm.endpoint_manager.send_to_comms(*index, &msg)?; } - Some(Route::LocalComponent(ComponentId::Native)) => branching_native.feed_msg( - cu, - rctx, - getter, - &send_payload_msg, - MapTempGuard::new(&mut bn_temp_owner), - ), - Some(Route::LocalComponent(ComponentId::Proto(proto_component_id))) => { - if let Some(branching_component) = - branching_proto_components.get_mut(proto_component_id) - { - let proto_component_id = *proto_component_id; + Some(Route::LocalComponent(cid)) if *cid == cu.inner.native_component_id => { + branching_native.feed_msg( + cu, + rctx, + getter, + &send_payload_msg, + MapTempGuard::new(&mut bn_temp_owner), + ) + } + Some(Route::LocalComponent(cid)) => { + if let Some(branching_component) = branching_proto_components.get_mut(cid) { + let cid = *cid; branching_component.feed_msg( cu, rctx, - proto_component_id, + cid, getter, &send_payload_msg, pcb_temps.reborrow(), )?; if branching_component.branches.is_empty() { - log!( - cu.inner.logger, - "{:?} has become inconsistent!", - proto_component_id - ); + log!(cu.inner.logger, "{:?} has become inconsistent!", cid); if let Some(parent) = comm.neighborhood.parent { if already_requested_failure.replace_with_true() { Self::request_failure(cu, comm, parent)? @@ -577,6 +577,7 @@ impl Connector { log!(cu.inner.logger, "Check if we have any local decisions..."); for solution in rctx.solution_storage.iter_new_local_make_old() { log!(cu.inner.logger, "New local decision with solution {:?}...", &solution); + log!(@MARK, cu.inner.logger, "local solution"); match comm.neighborhood.parent { Some(parent) => { log!(cu.inner.logger, "Forwarding to my parent {:?}", parent); @@ -743,7 +744,7 @@ impl BranchingNative { &predicate, &branch.gotten ); - let subtree_id = SubtreeId::LocalComponent(ComponentId::Native); + let subtree_id = SubtreeId::LocalComponent(cu.inner.native_component_id); round_ctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, subtree_id, @@ -869,7 +870,7 @@ impl BranchingProtoComponent { cd: CyclicDrainer, cu: &mut ConnectorUnphased, rctx: &mut RoundCtx, - proto_component_id: ProtoComponentId, + proto_component_id: ComponentId, ports: &HashSet, ) -> Result<(), UnrecoverableSyncError> { cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { @@ -918,13 +919,15 @@ impl BranchingProtoComponent { let var = cu.inner.port_info.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { - log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); + log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", + proto_component_id, putter, var); // discard forever drop((predicate, branch)); } else { // keep in "unblocked" branch.inner.did_put_or_get.insert(putter); - log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", proto_component_id, &payload, putter, var); + log!(cu.inner.logger, "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", + proto_component_id, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.getter_buffer.putter_add(cu, putter, msg); drainer.add_input(predicate, branch); @@ -938,14 +941,15 @@ impl BranchingProtoComponent { let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let did_fire = val == SpecVal::FIRING; if did_fire != should_have_fired { - log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); + log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}" + port, var, val, did_fire, should_have_fired); // IMPLICIT inconsistency drop((predicate, branch)); return Ok(()); } } // submit solution for this component - let subtree_id = SubtreeId::LocalComponent(ComponentId::Proto(proto_component_id)); + let subtree_id = SubtreeId::LocalComponent(proto_component_id); rctx.solution_storage.submit_and_digest_subtree_solution( &mut *cu.inner.logger, subtree_id, @@ -973,7 +977,7 @@ impl BranchingProtoComponent { &mut self, cu: &mut ConnectorUnphased, rctx: &mut RoundCtx, - proto_component_id: ProtoComponentId, + proto_component_id: ComponentId, getter: PortId, send_payload_msg: &SendPayloadMsg, pcb_temps: MapTempsGuard<'_, Predicate, ProtoComponentBranch>, @@ -1218,19 +1222,6 @@ impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { fn add_input(&mut self, k: K, v: V) { self.swap.insert(k, v); } - // fn merge_input_with V>(&mut self, k: K, v: V, mut func: F) { - // use std::collections::hash_map::Entry; - // let e = self.swap.entry(k); - // match e { - // Entry::Vacant(ev) => { - // ev.insert(v); - // } - // Entry::Occupied(mut eo) => { - // let old = eo.get_mut(); - // *old = func(v, old); - // } - // } - // } fn add_output(&mut self, k: K, v: V) { self.output.insert(k, v); } @@ -1246,15 +1237,13 @@ impl NonsyncProtoContext<'_> { &state, &moved_ports ); - assert!(self.proto_component_ports.is_subset(&moved_ports)); + println!("MOVED PORTS {:#?}. got {:#?}", &moved_ports, &self.proto_component_ports); + assert!(self.proto_component_ports.is_superset(&moved_ports)); // 2. remove ports from old component & update port->route - let new_id = self.cu_inner.id_manager.new_proto_component_id(); + let new_id = self.cu_inner.id_manager.new_component_id(); for port in moved_ports.iter() { self.proto_component_ports.remove(port); - self.cu_inner - .port_info - .routes - .insert(*port, Route::LocalComponent(ComponentId::Proto(new_id))); + self.cu_inner.port_info.routes.insert(*port, Route::LocalComponent(new_id)); } // 3. create a new component self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); @@ -1270,7 +1259,7 @@ impl NonsyncProtoContext<'_> { self.cu_inner.port_info.polarities.insert(i, Getter); self.cu_inner.port_info.peers.insert(o, i); self.cu_inner.port_info.peers.insert(i, o); - let route = Route::LocalComponent(ComponentId::Proto(self.proto_component_id)); + let route = Route::LocalComponent(self.proto_component_id); self.cu_inner.port_info.routes.insert(o, route); self.cu_inner.port_info.routes.insert(i, route); log!(