diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 30f7f4b74ab9feca14968d12ab5749e9583859b1..953151bdc93532b35e4ca164763cd350ecd96ba7 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -396,6 +396,7 @@ impl Connector { let ret = match decision { Decision::Failure => { // dropping {branching_proto_components, branching_native} + log!(cu.inner.logger, "Failure with {:#?}", &rctx.solution_storage); Err(Se::RoundFailure) } Decision::Success(predicate) => { @@ -903,8 +904,8 @@ impl BranchingProtoComponent { } else { // keep in "unblocked" branch.inner.did_put_or_get.insert(putter); - log!(cu.logger(), "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", - proto_component_id, &payload, putter, var); + log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})", + proto_component_id, &predicate, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.putter_push(cu, putter, msg); drainer.add_input(predicate, branch); @@ -1033,6 +1034,7 @@ impl BranchingProtoComponent { for (k, v) in branch.inner.inbox.drain() { old.inner.inbox.insert(k, v); } + old.ended |= branch.ended; } } } @@ -1109,6 +1111,7 @@ impl SolutionStorage { let Self { subtree_solutions, new_local, old_local, .. } = self; let was_new = subtree_solutions[index].insert(predicate.clone()); if was_new { + // iterator over SETS of solutions, one for every component except `subtree_id` (me) let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local); } @@ -1168,13 +1171,6 @@ impl NonsyncProtoContext<'_> { pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { // called by a PROTO COMPONENT. moves its own ports. // 1. sanity check: this component owns these ports - log!( - self.logger, - "Component {:?} added new component with state {:?}, moving ports {:?}", - self.proto_component_id, - &state, - &moved_ports - ); // sanity check for port in moved_ports.iter() { assert_eq!( @@ -1184,6 +1180,14 @@ impl NonsyncProtoContext<'_> { } // 2. create new component let new_cid = self.current_state.id_manager.new_component_id(); + log!( + self.logger, + "Component {:?} added new component {:?} with state {:?}, moving ports {:?}", + self.proto_component_id, + new_cid, + &state, + &moved_ports + ); self.unrun_components.push((new_cid, state)); // 3. update ownership of moved ports for port in moved_ports.iter() { @@ -1225,8 +1229,18 @@ impl NonsyncProtoContext<'_> { } impl ProtoComponentBranch { fn feed_msg(&mut self, getter: PortId, payload: Payload) { - let was = self.inner.inbox.insert(getter, payload); - assert!(was.is_none()) + let e = self.inner.inbox.entry(getter); + use std::collections::hash_map::Entry; + match e { + Entry::Vacant(ev) => { + // new message + ev.insert(payload); + } + Entry::Occupied(eo) => { + // redundant recv. can happen as a result of a component A having two branches X and Y related by + assert_eq!(eo.get(), &payload); + } + } } } impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> {