From 8da52bdbcaa739886eea2400f8cc57c3271d9f73 2022-04-22 11:32:53 From: MH Date: 2022-04-22 11:32:53 Subject: [PATCH] Initial extension of error-handling to detect pre-sync errors --- diff --git a/src/protocol/eval/executor.rs b/src/protocol/eval/executor.rs index c755413a0da76f4a30899939d3d1873eaaba0c39..3507bdbbb6d77b6868015cda70c68f49b98b634f 100644 --- a/src/protocol/eval/executor.rs +++ b/src/protocol/eval/executor.rs @@ -728,7 +728,7 @@ impl Prompt { // Convert the runtime-variant of a string // into an actual string. let value = cur_frame.expr_values.pop_front().unwrap(); - let mut is_literal_string = value.get_heap_pos().is_some(); + let is_literal_string = value.get_heap_pos().is_some(); let value = self.store.maybe_read_ref(&value); let value_heap_pos = value.as_string(); let elements = &self.store.heap_regions[value_heap_pos as usize].values; diff --git a/src/protocol/parser/pass_rewriting.rs b/src/protocol/parser/pass_rewriting.rs index 097359f0e0eddc48e4887a0485f0f5cc9d9e74d8..2b1535fd503abe31a50b9c17eec172158a695f0d 100644 --- a/src/protocol/parser/pass_rewriting.rs +++ b/src/protocol/parser/pass_rewriting.rs @@ -232,10 +232,12 @@ impl Visitor for PassRewriting { let case_num_ports = case.involved_ports.len(); for case_port_index in 0..case_num_ports { - // Arguments to runtime call - let original_get_call_id = case.involved_ports[case_port_index].0; + // Retrieve original span in case of error reporting for the + // inserted function call + let original_get_call_id = ctx.heap[id].cases[case_index].involved_ports[case_port_index].0; let original_get_call_span = ctx.heap[original_get_call_id].full_span; + // Arguments to runtime call let (port_variable_id, port_variable_type) = locals[total_port_index]; // so far this variable contains the temporary variables for the port expressions let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64, ctx.arch.uint32_type_id); let port_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_port_index as u64, ctx.arch.uint32_type_id); diff --git a/src/runtime2/component/component.rs b/src/runtime2/component/component.rs index 5d1b853af70754d563968c1b355dadd4c0c48e92..914ad380d3a355ed5516cf05c7063c80499fcdcc 100644 --- a/src/runtime2/component/component.rs +++ b/src/runtime2/component/component.rs @@ -67,7 +67,9 @@ pub(crate) trait Component { fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling; } -/// Representation of the generic operating mode of a component. +/// Representation of the generic operating mode of a component. Although not +/// every state may be used by every kind of (builtin) component, this allows +/// writing standard handlers for particular events in a component's lifetime. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) enum CompMode { NonSync, // not in sync mode @@ -179,6 +181,14 @@ impl CompExecState { } } +/// Generic representation of a component's inbox. +// NOTE: It would be nice to remove the `backup` at some point and make the +// blocking occur the moment a peer tries to send a message. +pub(crate) struct CompInbox { + main: Vec>, + backup: Vec, +} + /// Creates a new component based on its definition. Meaning that if it is a /// user-defined component then we set up the PDL code state. Otherwise we /// construct a custom component. This does NOT take care of port and message @@ -409,28 +419,38 @@ pub(crate) fn default_handle_control_message( // sent to one another. if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) { // The two components (sender and this component) are closing - // the channel at the same time. + // the channel at the same time. So we don't care about the + // content of the `ClosePort` message. default_handle_ack(control, control_id, sched_ctx, comp_ctx); } else { // Respond to the message let last_instruction = port_info.last_instruction; - let port_was_used = last_instruction != PortInstruction::None; + let port_has_had_message = port_info.received_message_for_sync; default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx); comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed - comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed - - // Make sure that we've not reached an error condition. Note - // that if this condition is not met, then we don't error out - // now, but we may error out in the next sync block when we - // try to `put`/`get` on the port. This condition makes sure - // that if we have a successful sync round, followed by the peer - // closing the port, that we don't consider the sync round to - // have failed by mistake. - if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used { - return Err(( - last_instruction, - format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) - )); + + // Handle any possible error conditions (which boil down to: the + // port has been used, but the peer has died). If not in sync + // mode then we close the port immediately. + + // Note that `port_was_used` does not mean that any messages + // were actually received. It might also mean that e.g. the + // component attempted a `get`, but there were no messages, so + // now it is in the `BlockedGet` state. + let port_was_used = last_instruction != PortInstruction::None; + + if exec_state.mode.is_in_sync_block() { + let closed_during_sync_round = content.closed_in_sync_round && port_was_used; + // TODO: Finish this + + if closed_during_sync_round { + return Err(( + last_instruction, + format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0) + )); + } + } else { + comp_ctx.set_port_state(port_handle, PortState::Closed); } } }, @@ -476,6 +496,12 @@ pub(crate) fn default_handle_control_message( return Ok(()); } +/// Handles a component entering the synchronous block. Will ensure that the +/// `Consensus` and the `ComponentCtx` are initialized properly. +pub(crate) fn default_handle_start_sync( + exec_state: &mut CompExecState, +) {} + /// Handles a component initiating the exiting procedure, and closing all of its /// ports. Should only be called once per component (which is ensured by /// checking and modifying the mode in the execution state). @@ -493,7 +519,7 @@ pub(crate) fn default_handle_start_exit( // round that we've failed. if exit_inside_sync { let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx); - default_handle_sync_decision(sched_ctx, exec_state, decision, consensus); + default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus); } // Iterating over ports by index to work around borrowing rules diff --git a/src/runtime2/component/component_context.rs b/src/runtime2/component/component_context.rs index c0ec092a208d6353fa9f266ed939cc749fb9288a..c02320d58b382798cd59dff48078bd4f00e9b7d6 100644 --- a/src/runtime2/component/component_context.rs +++ b/src/runtime2/component/component_context.rs @@ -14,13 +14,19 @@ pub enum PortInstruction { #[derive(Debug)] pub struct Port { + // Identifiers pub self_id: PortId, pub peer_comp_id: CompId, // eventually consistent pub peer_port_id: PortId, // eventually consistent + // Generic operating state pub kind: PortKind, pub state: PortState, - pub last_instruction: PortInstruction, // used during sync round in case port ends up being closed for error reporting - pub close_at_sync_end: bool, // used during sync round when receiving a `ClosePort(not_in_sync_round)` message + // State tracking for error detection and error handling + pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors + pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors + pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message + // Debugging flag to make sure each port is properly associated and + // disassociated with a peer component #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool, } @@ -72,6 +78,7 @@ impl CompCtx { peer_comp_id: self.id, last_instruction: PortInstruction::None, close_at_sync_end: false, + received_message_for_sync: false, #[cfg(debug_assertions)] associated_with_peer: false, }); self.ports.push(Port{ @@ -82,6 +89,7 @@ impl CompCtx { peer_comp_id: self.id, last_instruction: PortInstruction::None, close_at_sync_end: false, + received_message_for_sync: false, #[cfg(debug_assertions)] associated_with_peer: false, }); @@ -95,6 +103,7 @@ impl CompCtx { self_id, peer_comp_id, peer_port_id, kind, state, last_instruction: PortInstruction::None, close_at_sync_end: false, + received_message_for_sync: false, #[cfg(debug_assertions)] associated_with_peer: false, }); return LocalPortHandle(self_id); diff --git a/src/runtime2/component/component_internet.rs b/src/runtime2/component/component_internet.rs index 7e3ba9ca5512298f3a6bdbf2c7277889fb4cabec..cd80e3b817f8818688e722288db1721673a55d6c 100644 --- a/src/runtime2/component/component_internet.rs +++ b/src/runtime2/component/component_internet.rs @@ -104,7 +104,7 @@ impl Component for ComponentTcpClient { }, Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); }, Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( @@ -238,7 +238,7 @@ impl Component for ComponentTcpClient { // finish the sync round let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); return CompScheduling::Immediate; }, SyncState::Getting => { @@ -275,7 +275,7 @@ impl Component for ComponentTcpClient { SyncState::FinishSync | SyncState::FinishSyncThenQuit => { let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); return CompScheduling::Requeue; }, } diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index cc1b91b1a9023cc110ec827b2ad7e1022b3027c1..092b5d9e444392d7cb5a700e5143f1979e258a6b 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -347,7 +347,7 @@ impl Component for CompPDL { self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr( &self.prompt, &protocol.modules, &protocol.heap, expr_id, String::from("Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s") - )); + ))); return CompScheduling::Sleep; } } else if port_is_closed { @@ -601,7 +601,7 @@ impl CompPDL { fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); } /// Handles an error coming from the generic `component::handle_xxx` diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index dc8a63334b8e19a37604d51f35b79cd67734cf8d..07bb9bbee0bdedaaa31cd3339b942d37b71ac007 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -46,7 +46,7 @@ impl Component for ComponentRandomU32 { Message::Data(_message) => unreachable!(), Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); }, Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( @@ -123,7 +123,7 @@ impl Component for ComponentRandomU32 { sched_ctx.log("Waiting for consensus"); self.exec_state.mode = CompMode::SyncEnd; let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); - component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); return CompScheduling::Requeue; } },