diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 91555a7f9e34e40ad9775a3b6ffa30fa23289cf7..f68f736670de9d4e1aa4d94d95ecf704878b1fa0 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -152,7 +152,7 @@ impl Connector { ) -> Result<&mut NativeBatch, PortOpError> { use PortOpError as Poe; let Self { unphased: cu, phased } = self; - let info = cu.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?; + let info = cu.ips.port_info.get(&port).ok_or(Poe::UnknownPolarity)?; if info.owner != cu.native_component_id { return Err(Poe::PortUnavailable); } @@ -261,7 +261,7 @@ impl Connector { // while kicking off the branching of components until the set of // components entering their synchronous block is finalized in `branching_proto_components`. // This is the last time cu's components and ports are accessed until the round is decided. - let mut current_state = cu.current_state.clone(); + let mut ips = cu.ips.clone(); let mut branching_proto_components = HashMap::::default(); let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu @@ -282,7 +282,7 @@ impl Connector { ); let (logger, proto_description) = cu.logger_and_protocol_description(); let mut ctx = NonsyncProtoContext { - current_state: &mut current_state, + ips: &mut ips, logger, proto_component_id, unrun_components: &mut unrun_components, @@ -312,7 +312,7 @@ impl Connector { // Create temporary structures needed for the synchronous phase of the round let mut rctx = RoundCtx { - current_state, // already used previously, now moved into RoundCtx + ips, // already used previously, now moved into RoundCtx solution_storage: { let subtree_id_iter = { // Create an iterator over the identifiers of this @@ -337,7 +337,7 @@ impl Connector { ); SolutionStorage::new(subtree_id_iter) }, - spec_var_stream: cu.current_state.id_manager.new_spec_var_stream(), + spec_var_stream: cu.ips.id_manager.new_spec_var_stream(), payload_inbox: Default::default(), // buffer for in-memory payloads to be handled deadline: timeout.map(|to| Instant::now() + to), }; @@ -376,16 +376,16 @@ impl Connector { ); let firing_ports: HashSet = firing_iter.clone().collect(); for port in firing_iter { - let var = cu.current_state.spec_var_for(port); + let var = cu.ips.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } // all silent ports have SpecVal::SILENT - for port in cu.current_state.ports_owned_by(cu.native_component_id) { + for port in cu.ips.ports_owned_by(cu.native_component_id) { if firing_ports.contains(port) { // this one is FIRING continue; } - let var = cu.current_state.spec_var_for(*port); + let var = cu.ips.spec_var_for(*port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { log!(cu.logger(), "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; @@ -406,7 +406,7 @@ impl Connector { putter ); // sanity check - assert_eq!(Putter, cu.current_state.port_info.get(&putter).unwrap().polarity); + assert_eq!(Putter, cu.ips.port_info.get(&putter).unwrap().polarity); rctx.putter_push(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; @@ -481,7 +481,7 @@ impl Connector { .map(|(cid, bpc)| (cid, bpc.collapse_with(&predicate))), ); // commit changes to ports and id_manager - cu.current_state = rctx.current_state; + cu.ips = rctx.ips; log!( cu.logger, "End round with (updated) component states {:?}", @@ -580,7 +580,7 @@ impl Connector { log!(cu.logger(), "Decision loop! have {} messages to recv", rctx.payload_inbox.len()); while let Some((getter, send_payload_msg)) = rctx.getter_pop() { log!(@MARK, cu.logger(), "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg); - let getter_info = rctx.current_state.port_info.get(&getter).unwrap(); + let getter_info = rctx.ips.port_info.get(&getter).unwrap(); let cid = getter_info.owner; // the id of the component owning `getter` port assert_eq!(Getter, getter_info.polarity); // sanity check log!( @@ -832,7 +832,7 @@ impl BranchingNative { bn_temp: MapTempGuard<'_, Predicate, NativeBranch>, ) { log!(cu.logger(), "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert_eq!(Getter, rctx.current_state.port_info.get(&getter).unwrap().polarity); + assert_eq!(Getter, rctx.ips.port_info.get(&getter).unwrap().polarity); let mut draining = bn_temp; let finished = &mut self.branches; std::mem::swap(draining.0, finished); @@ -840,7 +840,7 @@ impl BranchingNative { // consistent with that of the received message. for (predicate, mut branch) in draining.drain() { log!(cu.logger(), "visiting native branch {:?} with {:?}", &branch, &predicate); - let var = rctx.current_state.spec_var_for(getter); + let var = rctx.ips.spec_var_for(getter); if predicate.query(var) != Some(SpecVal::FIRING) { // optimization. Don't bother trying this branch, // because the resulting branch would have an inconsistent predicate. @@ -1046,7 +1046,7 @@ impl BranchingProtoComponent { } B::CouldntCheckFiring(port) => { // sanity check: `CouldntCheckFiring` returned IFF the variable is speculatively assigned - let var = rctx.current_state.spec_var_for(port); + let var = rctx.ips.spec_var_for(port); assert!(predicate.query(var).is_none()); // speculate on the two possible values of `var`. Schedule both branches to be rerun. drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); @@ -1054,9 +1054,9 @@ impl BranchingProtoComponent { } B::PutMsg(putter, payload) => { // sanity check: The given port indeed has `Putter` polarity - assert_eq!(Putter, rctx.current_state.port_info.get(&putter).unwrap().polarity); + assert_eq!(Putter, rctx.ips.port_info.get(&putter).unwrap().polarity); // assign FIRING to this port's associated firing variable - let var = rctx.current_state.spec_var_for(putter); + let var = rctx.ips.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { // Discard the branch, as it clearly has contradictory requirements for this value. @@ -1079,8 +1079,8 @@ impl BranchingProtoComponent { B::SyncBlockEnd => { // This branch reached the end of it's synchronous block // assign all variables of owned ports that DIDN'T fire to SILENT - for port in rctx.current_state.ports_owned_by(proto_component_id) { - let var = rctx.current_state.spec_var_for(*port); + for port in rctx.ips.ports_owned_by(proto_component_id) { + let var = rctx.ips.spec_var_for(*port); let actually_exchanged = branch.inner.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let speculated_to_fire = val == SpecVal::FIRING; @@ -1362,11 +1362,11 @@ impl NonsyncProtoContext<'_> { for port in moved_ports.iter() { assert_eq!( self.proto_component_id, - self.current_state.port_info.get(port).unwrap().owner + self.ips.port_info.get(port).unwrap().owner ); } // Create the new component, and schedule it to be run - let new_cid = self.current_state.id_manager.new_component_id(); + let new_cid = self.ips.id_manager.new_component_id(); log!( self.logger, "Component {:?} added new component {:?} with state {:?}, moving ports {:?}", @@ -1378,7 +1378,7 @@ impl NonsyncProtoContext<'_> { self.unrun_components.push((new_cid, state)); // Update the ownership of the moved ports for port in moved_ports.iter() { - self.current_state.port_info.get_mut(port).unwrap().owner = new_cid; + self.ips.port_info.get_mut(port).unwrap().owner = new_cid; } } @@ -1386,9 +1386,9 @@ impl NonsyncProtoContext<'_> { // creating a new port-pair connected by an memory channel pub(crate) fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component - let mut new_cid_fn = || self.current_state.id_manager.new_port_id(); + let mut new_cid_fn = || self.ips.id_manager.new_port_id(); let [o, i] = [new_cid_fn(), new_cid_fn()]; - self.current_state.port_info.insert( + self.ips.port_info.insert( o, PortInfo { route: Route::LocalComponent, @@ -1397,7 +1397,7 @@ impl NonsyncProtoContext<'_> { owner: self.proto_component_id, }, ); - self.current_state.port_info.insert( + self.ips.port_info.insert( i, PortInfo { route: Route::LocalComponent, @@ -1420,7 +1420,7 @@ impl SyncProtoContext<'_> { // The component calls the runtime back, inspecting whether it's associated // preidcate has already determined a (speculative) value for the given port's firing variable. pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.rctx.current_state.spec_var_for(port); + let var = self.rctx.ips.spec_var_for(port); self.predicate.query(var).map(SpecVal::is_firing) }