diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index b4991f68b89f9b470feb528b563867ed5c1436b1..87f4d4f5c6f37d018cfbcf3056c3dc17705d9a79 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -39,62 +39,6 @@ struct CyclicDrainInner<'a, K: Eq + Hash, V> { } //////////////// -impl NonsyncProtoContext<'_> { - pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { - // called by a PROTO COMPONENT. moves its own ports. - // 1. sanity check: this component owns these ports - log!( - self.logger, - "Component {:?} added new component with state {:?}, moving ports {:?}", - self.proto_component_id, - &state, - &moved_ports - ); - assert!(self.proto_component_ports.is_subset(&moved_ports)); - // 2. remove ports from old component & update port->route - let new_id = self.id_manager.new_proto_component_id(); - for port in moved_ports.iter() { - self.proto_component_ports.remove(port); - self.port_info - .routes - .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); - } - // 3. create a new component - self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); - } - pub fn new_port_pair(&mut self) -> [PortId; 2] { - // adds two new associated ports, related to each other, and exposed to the proto component - let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; - self.proto_component_ports.insert(o); - self.proto_component_ports.insert(i); - // {polarity, peer, route} known. {} unknown. - self.port_info.polarities.insert(o, Putter); - self.port_info.polarities.insert(i, Getter); - self.port_info.peers.insert(o, i); - self.port_info.peers.insert(i, o); - let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id)); - self.port_info.routes.insert(o, route); - self.port_info.routes.insert(i, route); - log!( - self.logger, - "Component {:?} port pair (out->in) {:?} -> {:?}", - self.proto_component_id, - o, - i - ); - [o, i] - } -} -impl SyncProtoContext<'_> { - pub fn is_firing(&mut self, port: PortId) -> Option { - let var = self.port_info.firing_var_for(port); - self.predicate.query(var) - } - pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { - self.inbox.get(&port) - } -} - impl Connector { pub fn gotten(&mut self, port: PortId) -> Result<&Payload, GottenError> { use GottenError::*; @@ -634,7 +578,7 @@ impl BranchingNative { } fn collapse_with(self, solution_predicate: &Predicate) -> (usize, HashMap) { for (branch_predicate, branch) in self.branches { - if branch_predicate.satisfies(solution_predicate) { + if solution_predicate.satisfies(&branch_predicate) { let NativeBranch { index, gotten, .. } = branch; return (index, gotten); } @@ -642,40 +586,6 @@ impl BranchingNative { panic!("Native had no branches matching pred {:?}", solution_predicate); } } - -impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { - fn new( - input: &'a mut HashMap, - swap: &'a mut HashMap, - output: &'a mut HashMap, - ) -> Self { - Self { input, inner: CyclicDrainInner { swap, output } } - } - fn cylic_drain(self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>)) { - let Self { input, inner: CyclicDrainInner { swap, output } } = self; - // assert!(swap.is_empty()); - while !input.is_empty() { - for (k, v) in input.drain() { - func(k, v, CyclicDrainInner { swap, output }) - } - } - } -} -impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { - fn add_input(&mut self, k: K, v: V) { - self.swap.insert(k, v); - } - fn add_output(&mut self, k: K, v: V) { - self.output.insert(k, v); - } -} - -impl ProtoComponentBranch { - fn feed_msg(&mut self, getter: PortId, payload: Payload) { - let was = self.inbox.insert(getter, payload); - assert!(was.is_none()) - } -} impl BranchingProtoComponent { fn drain_branches_to_blocked( cd: CyclicDrainer, @@ -693,7 +603,6 @@ impl BranchingProtoComponent { logger, predicate: &predicate, port_info, - proto_component_id, inbox: &branch.inbox, }; let blocker = branch.state.sync_run(&mut ctx, proto_description); @@ -743,7 +652,6 @@ impl BranchingProtoComponent { assert_eq!(Some(&Putter), port_info.polarities.get(&putter)); // overwrite assignment let var = port_info.firing_var_for(putter); - let was = predicate.assigned.insert(var, true); if was == Some(false) { log!(logger, "Proto component {:?} tried to PUT on port {:?} when pred said var {:?}==Some(false). inconsistent!", proto_component_id, putter, var); @@ -773,24 +681,36 @@ impl BranchingProtoComponent { getter: PortId, send_payload_msg: SendPayloadMsg, ) { + log!( + logger, + "feeding proto component {:?} getter {:?} {:?}", + proto_component_id, + getter, + &send_payload_msg + ); let BranchingProtoComponent { branches, ports } = self; let mut unblocked = HashMap::default(); let mut blocked = HashMap::default(); // partition drain from branches -> {unblocked, blocked} + log!(logger, "visiting {} blocked branches...", branches.len()); for (predicate, mut branch) in branches.drain() { use CommonSatResult as Csr; + log!(logger, "visiting branch with pred {:?}", &predicate); match predicate.common_satisfier(&send_payload_msg.predicate) { Csr::Nonexistant => { // this branch does not receive the message + log!(logger, "skipping branch"); blocked.insert(predicate, branch); } Csr::Equivalent | Csr::FormerNotLatter => { // retain the existing predicate, but add this payload + log!(logger, "feeding this branch without altering its predicate"); branch.feed_msg(getter, send_payload_msg.payload.clone()); unblocked.insert(predicate, branch); } Csr::LatterNotFormer => { // fork branch, give fork the message and payload predicate. original branch untouched + log!(logger, "Forking this branch, giving it the predicate of the msg"); let mut branch2 = branch.clone(); let predicate2 = send_payload_msg.predicate.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); @@ -799,6 +719,7 @@ impl BranchingProtoComponent { } Csr::New(predicate2) => { // fork branch, give fork the message and the new predicate. original branch untouched + log!(logger, "Forking this branch with new predicate {:?}", &predicate2); let mut branch2 = branch.clone(); branch2.feed_msg(getter, send_payload_msg.payload.clone()); blocked.insert(predicate, branch); @@ -806,6 +727,7 @@ impl BranchingProtoComponent { } } } + log!(logger, "blocked {:?} unblocked {:?}", blocked.len(), unblocked.len()); // drain from unblocked --> blocked let mut swap = HashMap::default(); let cd = CyclicDrainer::new(&mut unblocked, &mut swap, &mut blocked); @@ -821,6 +743,7 @@ impl BranchingProtoComponent { ); // swap the blocked branches back std::mem::swap(&mut blocked, branches); + log!(logger, "component settles down with branches: {:?}", branches.keys()); } fn collapse_with(self, solution_predicate: &Predicate) -> ProtoComponent { let BranchingProtoComponent { ports, branches } = self; @@ -874,11 +797,9 @@ impl SolutionStorage { self.subtree_solutions.push(Default::default()) } } - - pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { - self.new_local.iter() - } - + // pub(crate) fn peek_new_locals(&self) -> impl Iterator + '_ { + // self.new_local.iter() + // } pub(crate) fn iter_new_local_make_old(&mut self) -> impl Iterator + '_ { let Self { old_local, new_local, .. } = self; new_local.drain().map(move |local| { @@ -886,7 +807,6 @@ impl SolutionStorage { local }) } - pub(crate) fn submit_and_digest_subtree_solution( &mut self, logger: &mut dyn Logger, @@ -911,7 +831,6 @@ impl SolutionStorage { ); } } - fn elaborate_into_new_local_rec<'a, 'b>( logger: &mut dyn Logger, partial: Predicate, @@ -942,3 +861,92 @@ impl SolutionStorage { } } } +impl SyncProtoContext<'_> { + pub fn is_firing(&mut self, port: PortId) -> Option { + let var = self.port_info.firing_var_for(port); + self.predicate.query(var) + } + pub fn read_msg(&mut self, port: PortId) -> Option<&Payload> { + self.inbox.get(&port) + } +} +impl<'a, K: Eq + Hash, V> CyclicDrainInner<'a, K, V> { + fn add_input(&mut self, k: K, v: V) { + self.swap.insert(k, v); + } + fn add_output(&mut self, k: K, v: V) { + self.output.insert(k, v); + } +} +impl NonsyncProtoContext<'_> { + pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { + // called by a PROTO COMPONENT. moves its own ports. + // 1. sanity check: this component owns these ports + log!( + self.logger, + "Component {:?} added new component with state {:?}, moving ports {:?}", + self.proto_component_id, + &state, + &moved_ports + ); + assert!(self.proto_component_ports.is_subset(&moved_ports)); + // 2. remove ports from old component & update port->route + let new_id = self.id_manager.new_proto_component_id(); + for port in moved_ports.iter() { + self.proto_component_ports.remove(port); + self.port_info + .routes + .insert(*port, Route::LocalComponent(LocalComponentId::Proto(new_id))); + } + // 3. create a new component + self.unrun_components.push((new_id, ProtoComponent { state, ports: moved_ports })); + } + pub fn new_port_pair(&mut self) -> [PortId; 2] { + // adds two new associated ports, related to each other, and exposed to the proto component + let [o, i] = [self.id_manager.new_port_id(), self.id_manager.new_port_id()]; + self.proto_component_ports.insert(o); + self.proto_component_ports.insert(i); + // {polarity, peer, route} known. {} unknown. + self.port_info.polarities.insert(o, Putter); + self.port_info.polarities.insert(i, Getter); + self.port_info.peers.insert(o, i); + self.port_info.peers.insert(i, o); + let route = Route::LocalComponent(LocalComponentId::Proto(self.proto_component_id)); + self.port_info.routes.insert(o, route); + self.port_info.routes.insert(i, route); + log!( + self.logger, + "Component {:?} port pair (out->in) {:?} -> {:?}", + self.proto_component_id, + o, + i + ); + [o, i] + } +} +impl ProtoComponentBranch { + fn feed_msg(&mut self, getter: PortId, payload: Payload) { + let was = self.inbox.insert(getter, payload); + assert!(was.is_none()) + } +} + +impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { + fn new( + input: &'a mut HashMap, + swap: &'a mut HashMap, + output: &'a mut HashMap, + ) -> Self { + Self { input, inner: CyclicDrainInner { swap, output } } + } + fn cylic_drain(self, mut func: impl FnMut(K, V, CyclicDrainInner<'_, K, V>)) { + let Self { input, inner: CyclicDrainInner { swap, output } } = self; + // assert!(swap.is_empty()); + while !input.is_empty() { + for (k, v) in input.drain() { + func(k, v, CyclicDrainInner { swap, output }) + } + std::mem::swap(input, swap); + } + } +}