diff --git a/src/macros.rs b/src/macros.rs index a377585f4426a71b9c85d6181a502652a69e7891..efe5ce62c31163a0cef6c8340f05e14de173f927 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -9,16 +9,16 @@ macro_rules! log { // } }}; (@MARK, $logger:expr, $($arg:tt)*) => {{ - if let Some(w) = $logger.line_writer() { - let _ = writeln!(w, $($arg)*); - } + // if let Some(w) = $logger.line_writer() { + // let _ = writeln!(w, $($arg)*); + // } }}; (@ENDPT, $logger:expr, $($arg:tt)*) => {{ // ignore }}; ($logger:expr, $($arg:tt)*) => {{ - if let Some(w) = $logger.line_writer() { - let _ = writeln!(w, $($arg)*); - } + // if let Some(w) = $logger.line_writer() { + // let _ = writeln!(w, $($arg)*); + // } }}; } diff --git a/src/runtime/communication.rs b/src/runtime/communication.rs index 30f7f4b74ab9feca14968d12ab5749e9583859b1..953151bdc93532b35e4ca164763cd350ecd96ba7 100644 --- a/src/runtime/communication.rs +++ b/src/runtime/communication.rs @@ -396,6 +396,7 @@ impl Connector { let ret = match decision { Decision::Failure => { // dropping {branching_proto_components, branching_native} + log!(cu.inner.logger, "Failure with {:#?}", &rctx.solution_storage); Err(Se::RoundFailure) } Decision::Success(predicate) => { @@ -903,8 +904,8 @@ impl BranchingProtoComponent { } else { // keep in "unblocked" branch.inner.did_put_or_get.insert(putter); - log!(cu.logger(), "Proto component {:?} putting payload {:?} on port {:?} (using var {:?})", - proto_component_id, &payload, putter, var); + log!(cu.logger(), "Proto component {:?} with pred {:?} putting payload {:?} on port {:?} (using var {:?})", + proto_component_id, &predicate, &payload, putter, var); let msg = SendPayloadMsg { predicate: predicate.clone(), payload }; rctx.putter_push(cu, putter, msg); drainer.add_input(predicate, branch); @@ -1033,6 +1034,7 @@ impl BranchingProtoComponent { for (k, v) in branch.inner.inbox.drain() { old.inner.inbox.insert(k, v); } + old.ended |= branch.ended; } } } @@ -1109,6 +1111,7 @@ impl SolutionStorage { let Self { subtree_solutions, new_local, old_local, .. } = self; let was_new = subtree_solutions[index].insert(predicate.clone()); if was_new { + // iterator over SETS of solutions, one for every component except `subtree_id` (me) let set_visitor = left.chain(right).map(|index| &subtree_solutions[index]); Self::elaborate_into_new_local_rec(cu, predicate, set_visitor, old_local, new_local); } @@ -1168,13 +1171,6 @@ impl NonsyncProtoContext<'_> { pub fn new_component(&mut self, moved_ports: HashSet, state: ComponentState) { // called by a PROTO COMPONENT. moves its own ports. // 1. sanity check: this component owns these ports - log!( - self.logger, - "Component {:?} added new component with state {:?}, moving ports {:?}", - self.proto_component_id, - &state, - &moved_ports - ); // sanity check for port in moved_ports.iter() { assert_eq!( @@ -1184,6 +1180,14 @@ impl NonsyncProtoContext<'_> { } // 2. create new component let new_cid = self.current_state.id_manager.new_component_id(); + log!( + self.logger, + "Component {:?} added new component {:?} with state {:?}, moving ports {:?}", + self.proto_component_id, + new_cid, + &state, + &moved_ports + ); self.unrun_components.push((new_cid, state)); // 3. update ownership of moved ports for port in moved_ports.iter() { @@ -1225,8 +1229,18 @@ impl NonsyncProtoContext<'_> { } impl ProtoComponentBranch { fn feed_msg(&mut self, getter: PortId, payload: Payload) { - let was = self.inner.inbox.insert(getter, payload); - assert!(was.is_none()) + let e = self.inner.inbox.entry(getter); + use std::collections::hash_map::Entry; + match e { + Entry::Vacant(ev) => { + // new message + ev.insert(payload); + } + Entry::Occupied(eo) => { + // redundant recv. can happen as a result of a component A having two branches X and Y related by + assert_eq!(eo.get(), &payload); + } + } } } impl<'a, K: Eq + Hash + 'static, V: 'static> CyclicDrainer<'a, K, V> { diff --git a/src/runtime/tests.rs b/src/runtime/tests.rs index e758c82a34f270a78d78d1574f131a6c9a574246..622f006ee0daf87fc2f3ff6c0c8a11843dd3abbe 100644 --- a/src/runtime/tests.rs +++ b/src/runtime/tests.rs @@ -1019,7 +1019,7 @@ fn pdl_msg_consensus() { fn sequencer3_prim() { let test_log_path = Path::new("./logs/sequencer3_prim"); let pdl = b" - primitive seq3primitive(out a, out b, out c) { + primitive sequencer3(out a, out b, out c) { int i = 0; while(true) synchronous { out to = a; @@ -1035,14 +1035,14 @@ fn sequencer3_prim() { let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - // setup a session between (a) native, and (b) primitive sequencer3, connected by 3 ports. + // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. let [p0, g0] = c.new_port_pair(); let [p1, g1] = c.new_port_pair(); let [p2, g2] = c.new_port_pair(); - c.add_component(b"seq3primitive", &[p0, p1, p2]).unwrap(); + c.add_component(b"sequencer3", &[p0, p1, p2]).unwrap(); c.connect(None).unwrap(); - let mut which_of_three = move || { + let which_of_three = move |c: &mut Connector| { // setup three sync batches. sync. return which succeeded c.get(g0).unwrap(); c.next_batch().unwrap(); @@ -1055,7 +1055,10 @@ fn sequencer3_prim() { const TEST_ROUNDS: usize = 50; // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { - assert_eq!(expected_batch_idx, which_of_three()); + // silent round + assert_eq!(0, c.sync(None).unwrap()); + // non silent round + assert_eq!(expected_batch_idx, which_of_three(&mut c)); } } @@ -1079,7 +1082,7 @@ fn sequencer3_comp() { composite fifo1(in a, out b) { new fifo1_init(null, a, b); } - composite seq3composite(out a, out b, out c) { + composite sequencer3(out a, out b, out c) { channel d -> e; channel f -> g; channel h -> i; @@ -1098,26 +1101,153 @@ fn sequencer3_comp() { let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); - // setup a session between (a) native, and (b) composite sequencer3, connected by 3 ports. + // setup a session between (a) native, and (b) sequencer3, connected by 3 ports. let [p0, g0] = c.new_port_pair(); let [p1, g1] = c.new_port_pair(); let [p2, g2] = c.new_port_pair(); - c.add_component(b"seq3composite", &[p0, p1, p2]).unwrap(); + c.add_component(b"sequencer3", &[p0, p1, p2]).unwrap(); c.connect(None).unwrap(); - let mut which_of_three = move || { + let which_of_three = move |c: &mut Connector| { // setup three sync batches. sync. return which succeeded c.get(g0).unwrap(); c.next_batch().unwrap(); c.get(g1).unwrap(); c.next_batch().unwrap(); c.get(g2).unwrap(); - c.sync(None).unwrap() + c.sync(SEC1).unwrap() }; const TEST_ROUNDS: usize = 50; // check that the batch index for rounds 0..TEST_ROUNDS are [0, 1, 2, 0, 1, 2, ...] for expected_batch_idx in (0..=2).cycle().take(TEST_ROUNDS) { - assert_eq!(expected_batch_idx, which_of_three()); + // silent round + assert_eq!(0, c.sync(SEC1).unwrap()); + // non silent round + assert_eq!(expected_batch_idx, which_of_three(&mut c)); + } +} + +enum XRouter2Item { + Silent, + GetA, + GetB, +} +// Hardcoded pseudo-random sequence of round behaviors for the native component +const XROUTER2_ITEMS: &[XRouter2Item] = { + use XRouter2Item::{GetA as A, GetB as B, Silent as S}; + &[ + B, A, S, B, A, A, B, S, B, S, A, A, S, B, B, S, B, S, B, B, S, B, B, A, B, B, A, B, A, B, + S, B, S, B, S, A, S, B, A, S, B, A, B, S, B, S, B, S, S, B, B, A, A, A, S, S, S, B, A, A, + A, S, S, B, B, B, A, B, S, S, A, A, B, A, B, B, A, A, A, B, A, B, S, A, B, S, A, A, B, S, + ] +}; + +#[test] +fn xrouter2_prim() { + let test_log_path = Path::new("./logs/xrouter2_prim"); + let pdl = b" + primitive xrouter2(in a, out b, out c) { + while(true) synchronous { + if(fires(a)) { + if(fires(b)) put(b, get(a)); + else put(c, get(a)); + } + } + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) xrouter2, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + let [p2, g2] = c.new_port_pair(); + c.add_component(b"xrouter2", &[g0, p1, p2]).unwrap(); + c.connect(None).unwrap(); + + let now = std::time::Instant::now(); + for item in XROUTER2_ITEMS.iter() { + match item { + XRouter2Item::Silent => {} + XRouter2Item::GetA => { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(g1).unwrap(); + } + XRouter2Item::GetB => { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(g2).unwrap(); + } + } + assert_eq!(0, c.sync(SEC1).unwrap()); + } + println!("PRIM {:?}", now.elapsed()); +} +#[test] +fn xrouter2_comp() { + let test_log_path = Path::new("./logs/xrouter2_comp"); + let pdl = b" + primitive lossy(in a, out b) { + while(true) synchronous { + if(fires(a)) { + msg m = get(a); + if(fires(b)) put(b, get(a)); + } + } + } + primitive sync_drain2(in a, in b) { + while(true) synchronous { + if(fires(a)) { + get(a); + get(b); + } + } + } + composite xrouter2(in a, out b, out c) { + channel d -> e; + channel f -> g; + channel h -> i; + channel j -> k; + channel l -> m; + channel n -> o; + channel p -> q; + channel r -> s; + channel t -> u; + + new replicator2(a, d, f); // ok + new replicator2(g, t, h); // ok + new lossy(e, l); // ok + new lossy(i, j); // ok + new replicator2(m, b, p); // ok + new replicator2(k, n, c); // ok + new merger2(q, o, r); + new sync_drain2(u, s); + } + "; + let pd = reowolf::ProtocolDescription::parse(pdl).unwrap(); + let mut c = file_logged_configured_connector(0, test_log_path, Arc::new(pd)); + + // setup a session between (a) native, and (b) xrouter2, connected by 3 ports. + let [p0, g0] = c.new_port_pair(); + let [p1, g1] = c.new_port_pair(); + let [p2, g2] = c.new_port_pair(); + c.add_component(b"xrouter2", &[g0, p1, p2]).unwrap(); + c.connect(None).unwrap(); + + let now = std::time::Instant::now(); + for item in XROUTER2_ITEMS.iter() { + match item { + XRouter2Item::Silent => {} + XRouter2Item::GetA => { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(g1).unwrap(); + } + XRouter2Item::GetB => { + c.put(p0, TEST_MSG.clone()).unwrap(); + c.get(g2).unwrap(); + } + } + assert_eq!(0, c.sync(SEC1).unwrap()); } + println!("COMP {:?}", now.elapsed()); }