diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 3899d7d4c6bd87e943fad19518069369204f4a99..03499cbc17803f00590555ee5de61b0553487c4d 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -114,7 +114,7 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ return match self.prepared.take() { PreparedStatement::None => None, PreparedStatement::CreatedChannel(ports) => Some(ports), - taken => unreachable!("prepared statement is '{:?}' during 'created_channel)_'", taken), + taken => unreachable!("prepared statement is '{:?}' during 'created_channel()'", taken), }; } @@ -386,18 +386,24 @@ impl ConnectorPDL { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - if let Err(_) = comp_ctx.submit_message(Message::Data(DataMessage { - sync_header, data_header, - content, - })) { - // We don't own the port - let pd = &sched_ctx.runtime.protocol_description; - let eval_error = branch.code_state.new_error_at_expr( - &pd.modules, &pd.heap, - String::from("attempted to 'put' on port that is no longer owned") - ); - self.eval_error = Some(eval_error); - self.mode = Mode::SyncError; + let message = DataMessage{ sync_header, data_header, content }; + match comp_ctx.submit_message(Message::Data(message)) { + Ok(_) => { + // Message is underway + branch.prepared = PreparedStatement::PerformedPut; + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; + }, + Err(_) => { + // We don't own the port + let pd = &sched_ctx.runtime.protocol_description; + let eval_error = branch.code_state.new_error_at_expr( + &pd.modules, &pd.heap, + String::from("attempted to 'put' on port that is no longer owned") + ); + self.eval_error = Some(eval_error); + self.mode = Mode::SyncError; + } } branch.prepared = PreparedStatement::PerformedPut; diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index b9980cbe237784ea95c05c76c986e651322ba3b3..7da2a179ad5808552f3503de09e651a7d5badf20 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -186,7 +186,12 @@ impl Consensus { target_port: port_info.peer_id, content: SyncPortContent::NotificationWave, }; - ctx.submit_message(Message::SyncPort(message)); + + // Note: submitting the message might fail. But we're attempting to + // handle the error anyway. + // TODO: Think about this a second time: how do we make sure the + // entire network will fail if we reach this condition + let _unused = ctx.submit_message(Message::SyncPort(message)); } return maybe_conclusion; @@ -265,7 +270,7 @@ impl Consensus { let channel_id = port_desc.channel_id; if !self.encountered_ports.contains(&self_port_id) { - ctx.submit_message(Message::SyncPort(SyncPortMessage { + let message = SyncPortMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, @@ -274,8 +279,17 @@ impl Consensus { source_port: self_port_id, target_port: peer_port_id, content: SyncPortContent::SilentPortNotification, - })); - self.encountered_ports.push(self_port_id); + }; + match ctx.submit_message(Message::SyncPort(message)) { + Ok(_) => { + self.encountered_ports.push(self_port_id); + }, + Err(_) => { + // Seems like we were done with this branch, but one of + // the silent ports (in scope) is actually closed + return self.notify_of_fatal_branch(branch_id, ctx); + } + } } target_mapping.push(( @@ -469,7 +483,9 @@ impl Consensus { target_port: port_desc.peer_id, content: SyncPortContent::NotificationWave, }; - ctx.submit_message(Message::SyncPort(message)).unwrap(); + // As with the other SyncPort where we throw away the + // result: we're dealing with an error here anyway + let _unused = ctx.submit_message(Message::SyncPort(message)); } } } @@ -566,7 +582,7 @@ impl Consensus { target_component_id: peer.id, content: SyncCompContent::Notification, }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } // But also send our locally combined solution @@ -579,7 +595,7 @@ impl Consensus { target_component_id: sync_header.sending_component_id, content: SyncCompContent::Notification }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } // else: exactly equal, so do nothing return true; @@ -665,7 +681,7 @@ impl Consensus { target_component_id: self.highest_connector_id, content, }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } return None; @@ -690,7 +706,7 @@ impl Consensus { target_component_id: connector_id, content: SyncCompContent::GlobalSolution(global_solution.clone()), }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } debug_assert!(my_final_branch_id.is_valid()); @@ -716,7 +732,7 @@ impl Consensus { target_component_id: presence.added_by, content: SyncCompContent::GlobalFailure, }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } } } @@ -748,7 +764,7 @@ impl Consensus { target_component_id: self.highest_connector_id, content: SyncCompContent::PartialSolution(partial_solution), }; - ctx.submit_message(Message::SyncComp(message)); + ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } } } diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 333f3e860354847eacb74b10880d7ff759dd1e82..bb039dbccc97e7707b8108ff31d0b73c5527b785 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -431,7 +431,7 @@ impl ConnectorStore { } } - // println!("DEBUG [ global store ] Created component at {}", key.index); + println!("DEBUG [ global store ] Created component at {}", key.index); return key; } @@ -444,7 +444,7 @@ impl ConnectorStore { // Note: but not deallocating! } - // println!("DEBUG [ global store ] Destroyed component at {}", key.index); + println!("DEBUG [ global store ] Destroyed component at {}", key.index); self.free.push(key.index as usize); } } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 14ebc566d0fa252649a9a9a71c91a29a432aad7b..8e33bedf26f4d0290cca2be1eeea18447ada1cb5 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -405,11 +405,11 @@ impl Scheduler { // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { - // println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { - // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index be4f43c5f300a707b8ea07ac8f7b2cac72ed69c4..987afe9e1b57e9b32556bf849e308aa34a3b60f3 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -15,9 +15,9 @@ use crate::runtime2::native::{ApplicationSyncAction}; // pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed // pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) -pub(crate) const NUM_THREADS: u32 = 1; -pub(crate) const NUM_INSTANCES: u32 = 5; -pub(crate) const NUM_LOOPS: u32 = 5; +pub(crate) const NUM_THREADS: u32 = 2; +pub(crate) const NUM_INSTANCES: u32 = 1; +pub(crate) const NUM_LOOPS: u32 = 1; fn create_runtime(pdl: &str) -> Runtime { diff --git a/src/runtime2/tests/sync_failure.rs b/src/runtime2/tests/sync_failure.rs index f3acdabcf1fbbd962ce73e7fe41de47edde701db..04e57cd267bb2ee220ee613790d0223fcb8caabf 100644 --- a/src/runtime2/tests/sync_failure.rs +++ b/src/runtime2/tests/sync_failure.rs @@ -65,7 +65,7 @@ fn test_shared_sync_failure() { "; run_test_in_runtime(CODE, |api| { - for variant in 0..4 { // all `Location` enum variants, except `Never`. + for variant in 3..4 { // all `Location` enum variants, except `Never`. // Create the channels api.create_connector("", "constructor", ValueGroup::new_stack(vec![ Value::Enum(variant)