diff --git a/docs/runtime/sync.md b/docs/runtime/sync.md index 16ddcfda0a0f6e84d51d70719ca2349e975cce65..305e02ae0f4a03f02538f64303e52c99c6b2dfa6 100644 --- a/docs/runtime/sync.md +++ b/docs/runtime/sync.md @@ -200,4 +200,4 @@ Suppose a synchronous region is (partially) established, and the component `E` e So that is to say that this `ClosePort(sync)` causes instant failure of `C` if it has used the closed port in a round without consensus, or if it uses that port in the future. Note that this `ClosePort(sync)` system causes cascading failures throughout the disjoint synchronous regions. This is as intended: once one component's PDL program can no longer be executed, we cannot depend on the discovery of all the peers that constitute the intended synchronous region. So instead we rely on a peer-to-peer mechanism to make sure that every component is notified of failure. -However, while these cascading peer-to-peer `ClosePort(sync)` messages are happily shared around, we still have a leader component somewhere, and components that have not yet been notified of the failure. Here we can make several design choices to \ No newline at end of file +However, while these cascading peer-to-peer `ClosePort(sync)` messages are happily shared around, we still have a leader component somewhere, and components that have not yet been notified of the failure. \ No newline at end of file diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 1ce574c330f24cb8b74fc25d7d5fdb35d5e6c00e..717810237d4a6aa84b74f9e6e7ae0f2adab9ce88 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -425,6 +425,7 @@ pub(crate) fn default_handle_control_message( // that if we have a successful sync round, followed by the peer // closing the port, that we don't consider the sync round to // have failed by mistake. + let error_due_to_port_use = if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { return Err(( last_instruction, diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index e9e41ac593a5b19ee65ba52fb9d25647a1a9fe4c..7e3ba9ca5512298f3a6bdbf2c7277889fb4cabec 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -207,6 +207,8 @@ impl Component for ComponentTcpClient { return CompScheduling::Sleep; } } else { + let port_handle = comp_ctx.get_port_handle(self.pdl_input_port_id); + comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::NoSource; self.exec_state.set_as_blocked_get(self.pdl_input_port_id); return CompScheduling::Sleep; } diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index 0e9bf987c38a1fcbc10e80cba1f1aff0b5f5df3d..49558c30e73d6fb90b48875c9f3bf0f8fa248450 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -318,6 +318,8 @@ impl Component for CompPDL { let port_id = port_id_from_eval(port_id); let port_handle = comp_ctx.get_port_handle(port_id); + comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::SourceLocation(expr_id); + let port_index = comp_ctx.get_port_index(port_handle); if let Some(message) = &self.inbox_main[port_index] { // Check if we can actually receive the message @@ -578,7 +580,7 @@ impl CompPDL { } }; - self.handle_component_error(sched_Ctx, error); + self.handle_component_error(sched_ctx, error); } fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, error: CompError) { diff --git a/src/runtime2/tests/error_handling.rs b/src/runtime2/tests/error_handling.rs new file mode 100644 index 0000000000000000000000000000000000000000..8405520d7e74f57b8ac02a1828d3bc86ae275d95 --- /dev/null +++ b/src/runtime2/tests/error_handling.rs @@ -0,0 +1,32 @@ +use super::*; + +#[test] +fn test_unconnected_component_error() { + let pd = ProtocolDescription::parse(b" + primitive interact_with_noone() { + u8[] array = { 5 }; + auto value = array[1]; + } + ").unwrap(); + let rt = Runtime::new(1, true, pd).unwrap(); + create_component(&rt, "", "interact_with_noone", no_args()); +} + +#[test] +fn test_connected_uncommunicating_component_error() { + let pd = ProtocolDescription::parse(b" + primitive crashing_and_burning(out unused) { + u8[] array = { 1337 }; + auto value = array[1337]; + } + primitive sitting_idly_waiting(in never_providing) { + sync auto a = get(never_providing); + } + composite constructor() { + channel a -> b; + new sitting_idly_waiting(b); + new crashing_and_burning(a); + }").unwrap(); + let rt = Runtime::new(1, true, pd).unwrap(); + create_component(&rt, "", "constructor", no_args()); +} \ No newline at end of file diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index fb47364e1adef4283a1a5f4961be67cd7bead285..6583f36c165d9eb7156bf872c743371240a15fc8 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -3,7 +3,9 @@ use crate::protocol::eval::*; use crate::runtime2::runtime::*; use crate::runtime2::component::{CompCtx, CompPDL}; -fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) { +mod error_handling; + +pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) { let prompt = rt.inner.protocol.new_component( module_name.as_bytes(), routine_name.as_bytes(), args ).expect("create prompt"); @@ -14,7 +16,7 @@ fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: V rt.inner.enqueue_work(key); } -fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) } +pub(crate) fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) } #[test] fn test_component_creation() {