diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 8e5e985f91578aa95625780c1bd0b1d30e83042a..b9980cbe237784ea95c05c76c986e651322ba3b3 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -350,7 +350,10 @@ impl Consensus { // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. let data_header = DataHeader{ - expected_mapping: branch.channel_mapping.clone(), + expected_mapping: branch.channel_mapping.iter() + .filter(|v| v.registered_id.is_some() || v.channel_id == port_info.channel_id) + .copied() + .collect(), sending_port: port_info.self_id, target_port: port_info.peer_id, new_mapping: branch.cur_marker, diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index bb039dbccc97e7707b8108ff31d0b73c5527b785..333f3e860354847eacb74b10880d7ff759dd1e82 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -431,7 +431,7 @@ impl ConnectorStore { } } - println!("DEBUG [ global store ] Created component at {}", key.index); + // println!("DEBUG [ global store ] Created component at {}", key.index); return key; } @@ -444,7 +444,7 @@ impl ConnectorStore { // Note: but not deallocating! } - println!("DEBUG [ global store ] Destroyed component at {}", key.index); + // println!("DEBUG [ global store ] Destroyed component at {}", key.index); self.free.push(key.index as usize); } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 3bc9c94477458743706ed7f8c32e622a41ef3bd6..14ebc566d0fa252649a9a9a71c91a29a432aad7b 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -208,7 +208,7 @@ impl Scheduler { // Note: we're not handling the public inbox, we're dealing with the // private one! debug_assert!(scheduled.shutting_down); - while let Some(message) = scheduled.ctx.read_next_message() { + while let Some(message) = scheduled.ctx.read_next_message_even_if_not_in_sync() { let target_port_and_round_number = match &message { Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)), Message::SyncComp(_) => None, @@ -405,11 +405,11 @@ impl Scheduler { // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { - println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + // println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { - println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } @@ -579,6 +579,10 @@ impl ComponentCtx { // TODO: Fix the clone of the data message, entirely unnecessary pub(crate) fn read_next_message(&mut self) -> Option { if !self.is_in_sync { return None; } + return self.read_next_message_even_if_not_in_sync(); + } + + pub(crate) fn read_next_message_even_if_not_in_sync(&mut self) -> Option { if self.inbox_len_read == self.inbox_messages.len() { return None; } // We want to keep data messages in the inbox, because we need to check diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 29c6c9621e87b27bf8a198a446a90a16aa477171..be4f43c5f300a707b8ea07ac8f7b2cac72ed69c4 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -16,8 +16,8 @@ use crate::runtime2::native::{ApplicationSyncAction}; // pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) pub(crate) const NUM_THREADS: u32 = 1; -pub(crate) const NUM_INSTANCES: u32 = 1; -pub(crate) const NUM_LOOPS: u32 = 1; +pub(crate) const NUM_INSTANCES: u32 = 5; +pub(crate) const NUM_LOOPS: u32 = 5; fn create_runtime(pdl: &str) -> Runtime { diff --git a/src/runtime2/tests/sync_failure.rs b/src/runtime2/tests/sync_failure.rs index 392b8a22fa18fbf5c6fe934059189f7bfd84bea9..f3acdabcf1fbbd962ce73e7fe41de47edde701db 100644 --- a/src/runtime2/tests/sync_failure.rs +++ b/src/runtime2/tests/sync_failure.rs @@ -36,7 +36,9 @@ fn test_local_sync_failure() { #[test] fn test_shared_sync_failure() { // Same as above. One of the components should fail, the other should follow - // suit because it cannot complete a sync round. + // suit because it cannot complete a sync round. We intentionally have an + // infinite loop in the while condition because we need at least two loops + // for the last error to get picked up. const CODE: &'static str = " enum Location { BeforeSync, AfterPut, AfterGet, AfterSync, Never } primitive failing_at_location(in input, out output, Location loc) { @@ -63,7 +65,7 @@ fn test_shared_sync_failure() { "; run_test_in_runtime(CODE, |api| { - for variant in 0..1 { + for variant in 0..4 { // all `Location` enum variants, except `Never`. // Create the channels api.create_connector("", "constructor", ValueGroup::new_stack(vec![ Value::Enum(variant)