diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 22834bbf2cc17c68fe44c83e4de4b5ccd0f0a8c2..3f693584689850f084c091987960dc22a891a1b5 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -37,7 +37,6 @@ struct SolutionStorage { } #[derive(Debug)] struct BranchingProtoComponent { - ports: HashSet, branches: HashMap, } #[derive(Debug, Clone)] @@ -135,13 +134,12 @@ impl Connector { ) -> Result<&mut NativeBatch, PortOpError> { use PortOpError as Poe; let Self { unphased: cu, phased } = self; - if !cu.inner.native_ports.contains(&port) { + let info = cu.inner.current_state.port_info.get(&port).ok_or(Poe::UnknownPolarity)?; + if info.owner != cu.inner.native_component_id { return Err(Poe::PortUnavailable); } - match cu.inner.port_info.polarities.get(&port) { - Some(p) if *p == expect_polarity => {} - Some(_) => return Err(Poe::WrongPolarity), - None => return Err(Poe::UnknownPolarity), + if info.polarity != expect_polarity { + return Err(Poe::WrongPolarity); } match phased { ConnectorPhased::Setup { .. } => Err(Poe::NotConnected), @@ -217,7 +215,7 @@ impl Connector { // iterate let mut branching_proto_components = HashMap::::default(); - let mut unrun_components: Vec<(ComponentId, ProtoComponent)> = cu + let mut unrun_components: Vec<(ComponentId, ComponentState)> = cu .proto_components .iter() .map(|(&proto_id, proto)| (proto_id, proto.clone())) @@ -235,13 +233,8 @@ impl Connector { cu_inner: &mut cu.inner, proto_component_id, unrun_components: &mut unrun_components, - proto_component_ports: &mut cu - .proto_components - .get_mut(&proto_component_id) - .unwrap() // unrun_components' keys originate from proto_components - .ports, }; - let blocker = component.state.nonsync_run(&mut ctx, &cu.proto_description); + let blocker = component.nonsync_run(&mut ctx, &cu.proto_description); log!( cu.inner.logger, "proto component {:?} ran to nonsync blocker {:?}", @@ -283,7 +276,7 @@ impl Connector { ); SolutionStorage::new(subtree_id_iter) }, - spec_var_stream: cu.inner.id_manager.new_spec_var_stream(), + spec_var_stream: cu.inner.current_state.id_manager.new_spec_var_stream(), getter_buffer: Default::default(), deadline: timeout.map(|to| Instant::now() + to), }; @@ -305,16 +298,30 @@ impl Connector { let NativeBatch { to_get, to_put } = native_branch; let predicate = { let mut predicate = Predicate::default(); - // assign trues for ports that fire - let firing_ports: HashSet = - to_get.iter().chain(to_put.keys()).copied().collect(); - for &port in to_get.iter().chain(to_put.keys()) { - let var = cu.inner.port_info.spec_var_for(port); + // all firing ports have SpecVal::FIRING + let firing_iter = to_get.iter().chain(to_put.keys()).copied(); + + log!( + cu.inner.logger, + "New native with firing ports {:?}", + firing_iter.clone().collect::>() + ); + let firing_ports: HashSet = firing_iter.clone().collect(); + for port in firing_iter { + let var = cu.inner.current_state.spec_var_for(port); predicate.assigned.insert(var, SpecVal::FIRING); } - // assign falses for all silent (not firing) ports - for &port in cu.inner.native_ports.difference(&firing_ports) { - let var = cu.inner.port_info.spec_var_for(port); + // all silent ports have SpecVal::SILENT + for (port, port_info) in cu.inner.current_state.port_info.iter() { + if port_info.owner != cu.inner.native_component_id { + // not my port + continue; + } + if firing_ports.contains(port) { + // this one is FIRING + continue; + } + let var = cu.inner.current_state.spec_var_for(*port); if let Some(SpecVal::FIRING) = predicate.assigned.insert(var, SpecVal::SILENT) { log!(cu.inner.logger, "Native branch index={} contains internal inconsistency wrt. {:?}. Skipping", index, var); continue 'native_branches; @@ -339,6 +346,8 @@ impl Connector { &msg, putter ); + // sanity check + assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity); rctx.getter_buffer.putter_add(cu, putter, msg); } let branch = NativeBranch { index, gotten: Default::default(), to_get }; @@ -458,20 +467,14 @@ impl Connector { branching_proto_components.len() ); for (&proto_component_id, proto_component) in branching_proto_components.iter_mut() { - let BranchingProtoComponent { ports, branches } = proto_component; + let BranchingProtoComponent { branches } = proto_component; // must reborrow to constrain the lifetime of pcb_temps to inside the loop let (swap, pcb_temps) = pcb_temps.reborrow().split_first_mut(); let (blocked, _pcb_temps) = pcb_temps.split_first_mut(); // initially, no components have .ended==true // drain from branches --> blocked let cd = CyclicDrainer::new(branches, swap.0, blocked.0); - BranchingProtoComponent::drain_branches_to_blocked( - cd, - cu, - rctx, - proto_component_id, - ports, - )?; + BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; // swap the blocked branches back std::mem::swap(blocked.0, branches); if branches.is_empty() { @@ -500,34 +503,34 @@ impl Connector { rctx.getter_buffer.len() ); while let Some((getter, send_payload_msg)) = rctx.getter_buffer.pop() { - log!(@MARK, cu.inner.logger, "handling payload msg"); - assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); - let route = cu.inner.port_info.routes.get(&getter); + log!(@MARK, cu.inner.logger, "handling payload msg for getter {:?} of {:?}", getter, &send_payload_msg); + let getter_info = cu.inner.current_state.port_info.get(&getter).unwrap(); + let cid = getter_info.owner; + assert_eq!(Getter, getter_info.polarity); log!( cu.inner.logger, "Routing msg {:?} to {:?} via {:?}", &send_payload_msg, getter, - &route + &getter_info.route ); - match route { - None => log!(cu.inner.logger, "Delivery failed. Physical route unmapped!"), - Some(Route::UdpEndpoint { index }) => { + match getter_info.route { + Route::UdpEndpoint { index } => { let udp_endpoint_ext = - &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[*index]; + &mut comm.endpoint_manager.udp_endpoint_store.endpoint_exts[index]; let SendPayloadMsg { predicate, payload } = send_payload_msg; log!(cu.inner.logger, "Delivering to udp endpoint index={}", index); udp_endpoint_ext.outgoing_payloads.insert(predicate, payload); } - Some(Route::NetEndpoint { index }) => { + Route::NetEndpoint { index } => { log!(@MARK, cu.inner.logger, "sending payload"); let msg = Msg::CommMsg(CommMsg { round_index: comm.round_index, contents: CommMsgContents::SendPayload(send_payload_msg), }); - comm.endpoint_manager.send_to_comms(*index, &msg)?; + comm.endpoint_manager.send_to_comms(index, &msg)?; } - Some(Route::LocalComponent(cid)) if *cid == cu.inner.native_component_id => { + Route::LocalComponent if cid == cu.inner.native_component_id => { branching_native.feed_msg( cu, rctx, @@ -536,9 +539,9 @@ impl Connector { MapTempGuard::new(&mut bn_temp_owner), ) } - Some(Route::LocalComponent(cid)) => { - if let Some(branching_component) = branching_proto_components.get_mut(cid) { - let cid = *cid; + Route::LocalComponent => { + if let Some(branching_component) = branching_proto_components.get_mut(&cid) + { branching_component.feed_msg( cu, rctx, @@ -566,7 +569,7 @@ impl Connector { "Delivery to getter {:?} msg {:?} failed because {:?} isn't here", getter, &send_payload_msg, - proto_component_id + cid ); } } @@ -605,7 +608,7 @@ impl Connector { .endpoint_manager .try_recv_any_comms( &mut *cu.inner.logger, - &cu.inner.port_info, + &cu.inner.current_state, rctx, comm.round_index, )? { @@ -725,14 +728,14 @@ impl BranchingNative { bn_temp: MapTempGuard<'_, Predicate, NativeBranch>, ) { log!(cu.inner.logger, "feeding native getter {:?} {:?}", getter, &send_payload_msg); - assert!(cu.inner.port_info.polarities.get(&getter).copied() == Some(Getter)); + assert_eq!(Getter, cu.inner.current_state.port_info.get(&getter).unwrap().polarity); let mut draining = bn_temp; let finished = &mut self.branches; std::mem::swap(draining.0, finished); for (predicate, mut branch) in draining.drain() { log!(cu.inner.logger, "visiting native branch {:?} with {:?}", &branch, &predicate); // check if this branch expects to receive it - let var = cu.inner.port_info.spec_var_for(getter); + let var = cu.inner.current_state.spec_var_for(getter); let mut feed_branch = |branch: &mut NativeBranch, predicate: &Predicate| { branch.to_get.remove(&getter); let was = branch.gotten.insert(getter, send_payload_msg.payload.clone()); @@ -871,7 +874,6 @@ impl BranchingProtoComponent { cu: &mut ConnectorUnphased, rctx: &mut RoundCtx, proto_component_id: ComponentId, - ports: &HashSet, ) -> Result<(), UnrecoverableSyncError> { cd.cyclic_drain(|mut predicate, mut branch, mut drainer| { let mut ctx = SyncProtoContext { @@ -906,7 +908,7 @@ impl BranchingProtoComponent { } B::CouldntCheckFiring(port) => { // sanity check - let var = cu.inner.port_info.spec_var_for(port); + let var = cu.inner.current_state.spec_var_for(port); assert!(predicate.query(var).is_none()); // keep forks in "unblocked" drainer.add_input(predicate.clone().inserted(var, SpecVal::SILENT), branch.clone()); @@ -914,9 +916,9 @@ impl BranchingProtoComponent { } B::PutMsg(putter, payload) => { // sanity check - assert_eq!(Some(&Putter), cu.inner.port_info.polarities.get(&putter)); + assert_eq!(Putter, cu.inner.current_state.port_info.get(&putter).unwrap().polarity); // overwrite assignment - let var = cu.inner.port_info.spec_var_for(putter); + let var = cu.inner.current_state.spec_var_for(putter); let was = predicate.assigned.insert(var, SpecVal::FIRING); if was == Some(SpecVal::SILENT) { log!(cu.inner.logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", @@ -935,13 +937,16 @@ impl BranchingProtoComponent { } B::SyncBlockEnd => { // make concrete all variables - for port in ports.iter() { - let var = cu.inner.port_info.spec_var_for(*port); + for (port, port_info) in cu.inner.current_state.port_info.iter() { + if port_info.owner != proto_component_id { + continue; + } + let var = cu.inner.current_state.spec_var_for(*port); let should_have_fired = branch.inner.did_put_or_get.contains(port); let val = *predicate.assigned.entry(var).or_insert(SpecVal::SILENT); let did_fire = val == SpecVal::FIRING; if did_fire != should_have_fired { - log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}" + log!(cu.inner.logger, "Inconsistent wrt. port {:?} var {:?} val {:?} did_fire={}, should_have_fired={}", port, var, val, did_fire, should_have_fired); // IMPLICIT inconsistency drop((predicate, branch)); @@ -963,16 +968,6 @@ impl BranchingProtoComponent { Ok(()) }) } - // fn branch_merge_func( - // mut a: ProtoComponentBranch, - // b: &mut ProtoComponentBranch, - // ) -> ProtoComponentBranch { - // if b.ended && !a.ended { - // a.ended = true; - // std::mem::swap(&mut a, b); - // } - // a - // } fn feed_msg( &mut self, cu: &mut ConnectorUnphased, @@ -990,7 +985,7 @@ impl BranchingProtoComponent { getter, &send_payload_msg ); - let BranchingProtoComponent { branches, ports } = self; + let BranchingProtoComponent { branches } = self; let (mut unblocked, pcb_temps) = pcb_temps.split_first_mut(); let (mut blocked, pcb_temps) = pcb_temps.split_first_mut(); // partition drain from branches -> {unblocked, blocked} @@ -1038,13 +1033,7 @@ impl BranchingProtoComponent { // drain from unblocked --> blocked let (swap, _pcb_temps) = pcb_temps.split_first_mut(); let cd = CyclicDrainer::new(unblocked.0, swap.0, blocked.0); - BranchingProtoComponent::drain_branches_to_blocked( - cd, - cu, - rctx, - proto_component_id, - ports, - )?; + BranchingProtoComponent::drain_branches_to_blocked(cd, cu, rctx, proto_component_id)?; // swap the blocked branches back std::mem::swap(blocked.0, branches); log!(cu.inner.logger, "component settles down with branches: {:?}", branches.keys()); @@ -1073,19 +1062,19 @@ impl BranchingProtoComponent { } } } - fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { - let BranchingProtoComponent { ports, branches } = self; + fn collapse_with(self, solution_predicate: &Predicate) -> ComponentState { + let BranchingProtoComponent { branches } = self; for (branch_predicate, branch) in branches { if branch.ended && branch_predicate.assigns_subset(solution_predicate) { let ProtoComponentBranch { state, .. } = branch; - return ProtoComponent { state, ports }; + return state; } } panic!("ProtoComponent had no branches matching pred {:?}", solution_predicate); } - fn initial(ProtoComponent { state, ports }: ProtoComponent) -> Self { + fn initial(state: ComponentState) -> Self { let branch = ProtoComponentBranch { state, inner: Default::default(), ended: false }; - Self { ports, branches: hashmap! { Predicate::default() => branch } } + Self { branches: hashmap! { Predicate::default() => branch } } } } impl SolutionStorage { @@ -1197,7 +1186,8 @@ impl GetterBuffer { self.getters_and_sends.push((getter, msg)); } fn putter_add(&mut self, cu: &mut ConnectorUnphased, putter: PortId, msg: SendPayloadMsg) { - if let Some(&getter) = cu.inner.port_info.peers.get(&putter) { + if let Some(getter) = cu.inner.current_state.port_info.get(&putter).unwrap().peer { + log!(cu.inner.logger, "Putter add (putter:{:?} => getter:{:?})", putter, getter); self.getter_add(getter, msg); } else { log!(cu.inner.logger, "Putter {:?} has no known peer!", putter); @@ -1207,7 +1197,7 @@ impl GetterBuffer { } impl SyncProtoContext<'_> { pub(crate) fn is_firing(&mut self, port: PortId) -> Option { - let var = self.cu_inner.port_info.spec_var_for(port); + let var = self.cu_inner.current_state.spec_var_for(port); self.predicate.query(var).map(SpecVal::is_firing) } pub(crate) fn read_msg(&mut self, port: PortId) -> Option<&Payload> { @@ -1237,31 +1227,45 @@ impl NonsyncProtoContext<'_> { &state, &moved_ports ); - println!("MOVED PORTS {:#?}. got {:#?}", &moved_ports, &self.proto_component_ports); - assert!(self.proto_component_ports.is_superset(&moved_ports)); - // 2. remove ports from old component & update port->route - let new_id = self.cu_inner.id_manager.new_component_id(); + println!("MOVED PORTS {:#?}", &moved_ports); + // sanity check + for port in moved_ports.iter() { + assert_eq!( + self.proto_component_id, + self.cu_inner.current_state.port_info.get(port).unwrap().owner + ); + } + // 2. create new component + let new_cid = self.cu_inner.current_state.id_manager.new_component_id(); + self.unrun_components.push((new_cid, state)); + // 3. update ownership of moved ports for port in moved_ports.iter() { - self.proto_component_ports.remove(port); - self.cu_inner.port_info.routes.insert(*port, Route::LocalComponent(new_id)); + self.cu_inner.current_state.port_info.get_mut(port).unwrap().owner = new_cid; } // 3. create a new component - self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); } pub fn new_port_pair(&mut self) -> [PortId; 2] { // adds two new associated ports, related to each other, and exposed to the proto component - let [o, i] = - [self.cu_inner.id_manager.new_port_id(), self.cu_inner.id_manager.new_port_id()]; - self.proto_component_ports.insert(o); - self.proto_component_ports.insert(i); - // {polarity, peer, route} known. {} unknown. - self.cu_inner.port_info.polarities.insert(o, Putter); - self.cu_inner.port_info.polarities.insert(i, Getter); - self.cu_inner.port_info.peers.insert(o, i); - self.cu_inner.port_info.peers.insert(i, o); - let route = Route::LocalComponent(self.proto_component_id); - self.cu_inner.port_info.routes.insert(o, route); - self.cu_inner.port_info.routes.insert(i, route); + let mut new_cid_fn = || self.cu_inner.current_state.id_manager.new_port_id(); + let [o, i] = [new_cid_fn(), new_cid_fn()]; + self.cu_inner.current_state.port_info.insert( + o, + PortInfo { + route: Route::LocalComponent, + peer: Some(i), + polarity: Putter, + owner: self.proto_component_id, + }, + ); + self.cu_inner.current_state.port_info.insert( + i, + PortInfo { + route: Route::LocalComponent, + peer: Some(o), + polarity: Getter, + owner: self.proto_component_id, + }, + ); log!( self.cu_inner.logger, "Component {:?} port pair (out->in) {:?} -> {:?}",