Changeset - e7e7211531c8
[Not reviewed]
0 5 1
mh - 3 years ago 2022-04-21 08:50:30
contact@maxhenger.nl
Initial error-handling tests
6 files changed with 43 insertions and 4 deletions:
0 comments (0 inline, 0 general)
docs/runtime/sync.md
Show inline comments
 
@@ -179,25 +179,25 @@ In this case it is sufficient for `E` to send around a `ClosePort` message. As d
 
   - `L` broadcasts `Solution` for the current sync round.
 
   - `E` receives `Solution` finishes round.
 
   - `E` encounters an error, so sends `ClosePort` to `C`.
 
   - `C` receives `Solution` from `L`.
 
   - `C` receives `ClosePort` from `E`.
 

	
 
In all described cases `E` encounters an error after finishing a sync round. But from the point of view of `C` it is unsure whether the `ClosePort` message pertains to the current synchronous round or not. In case 1 and 3 nothing is out of the ordinary. But in case 2 we have that `C` is at a particular point in time aware of the `ClosePort` from `E`, but not yet of the `Solution` from `L`. `C` should not fail the sync round, as it is completed, but it is unaware of this fact.
 

	
 
As a rather simple solution, since components that are participating with one another in a sync round move in lock-step at the end of the sync block, we send a boolean along with the `ClosePort`, e.g. `ClosePort(nonsync)`. This boolean indicates whether `E` was inside or outside of a sync block during it encountering an error. Now `C` can distinguish between the three cases: in all cases it agrees that `E` was not in a sync block (and hence: the sync round in cases 2 and 3 can be completed).
 

	
 
### Handling Errors Inside of a Sync Block
 

	
 
If `E` is inside of a sync block. Then it has interacted with other components. Our requirement now is that the sync round fails (and ofcourse, that all of the peers are notified that `E` will no longer be present in the runtime). There are two things that are complicating this type of failure:
 

	
 
1. Suppose that in the successful case of the synchronous interaction, there are a large number of components interacting with one another. Now it might be that `E` fails very early in its sync block, such that it cannot interact with several components. This lack of interaction might cause the single sync block to break up into several smaller sync blocks. Each of these separated regions is supposed to fail.
 
2. Within a particular synchronous interaction we might have that the leader `L` has a reference to the component `E` without it being a direct peer. There is a reference counting system in place that makes sure that `L` can always send messages to `E`. But we still need to make sure that those references stay alive for as long as needed. 
 

	
 
Suppose a synchronous region is (partially) established, and the component `E` encounters a critical error. The two points given above imply that two processes need to be initiated. For the first error-handling process, we simply use the same scheme as described in the case where `E` is not in a synchronous region. However now we broadcast `ClosePort(sync)` instead of `ClosePort(nonsync)` messages. Consider the following two cases: 
 

	
 
1. Component `C` is not part of the same synchronous region as `E`. And component `C` has tried `put`ting to `E`. If `C` receives a `ClosePort(sync)`, then it knows that its interaction should fail. Note: it might be that `E` wasn't planning on `get`ting from `C` in the sync round in which `E` failed, but much later. In that case it still makes sense for `C` to fail; it would have failed in the future. A small inconsistency here (within the current infinitely-deadlocking implementation) is that if `E` would *never* `get` from `C`, then `C` would deadlock instead of crash (one could argue that this implies that deadlocking should lead to crashing through a timeout mechanism).
 
2. Component `C` is not part of the same synchronous region as `E`. And if `E` wouldn't have crashed, then it would've `put` a message to `C`. In this case it is still proper for `C` to crash. The reasoning works the same as above.
 

	
 
So that is to say that this `ClosePort(sync)` causes instant failure of `C` if it has used the closed port in a round without consensus, or if it uses that port in the future. Note that this `ClosePort(sync)` system causes cascading failures throughout the disjoint synchronous regions. This is as intended: once one component's PDL program can no longer be executed, we cannot depend on the discovery of all the peers that constitute the intended synchronous region. So instead we rely on a peer-to-peer mechanism to make sure that every component is notified of failure.
 

	
 
However, while these cascading peer-to-peer `ClosePort(sync)` messages are happily shared around, we still have a leader component somewhere, and components that have not yet been notified of the failure. Here we can make several design choices to 
 
\ No newline at end of file
 
However, while these cascading peer-to-peer `ClosePort(sync)` messages are happily shared around, we still have a leader component somewhere, and components that have not yet been notified of the failure.
 
\ No newline at end of file
src/runtime2/component/component.rs
Show inline comments
 
@@ -404,48 +404,49 @@ pub(crate) fn default_handle_control_message(
 
            let peer_handle = comp_ctx.get_peer_handle(peer_comp_id);
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time.
 
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
 
            } else {
 
                // Respond to the message
 
                let last_instruction = port_info.last_instruction;
 
                let port_was_used = last_instruction != PortInstruction::None;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
 
                comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
 

	
 
                // Make sure that we've not reached an error condition. Note
 
                // that if this condition is not met, then we don't error out
 
                // now, but we may error out in the next sync block when we
 
                // try to `put`/`get` on the port. This condition makes sure
 
                // that if we have a successful sync round, followed by the peer
 
                // closing the port, that we don't consider the sync round to
 
                // have failed by mistake.
 
                let error_due_to_port_use =  
 
                if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used {
 
                    return Err((
 
                        last_instruction,
 
                        format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0)
 
                    ));
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort(port_id) => {
 
            // We were previously blocked (or already closed)
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
            let port_info = comp_ctx.get_port(port_handle);
 
            debug_assert_eq!(port_info.kind, PortKind::Putter);
 
            if port_info.state == PortState::BlockedDueToFullBuffers {
 
                default_handle_unblock_put(exec_state, consensus, port_handle, sched_ctx, comp_ctx);
 
            }
 
        },
 
        ControlMessageContent::PortPeerChangedBlock(port_id) => {
 
            // The peer of our port has just changed. So we are asked to
 
            // temporarily block the port (while our original recipient is
 
            // potentially rerouting some of the in-flight messages) and
 
            // Ack. Then we wait for the `unblock` call.
 
            debug_assert_eq!(message.target_port_id, Some(port_id));
 
            let port_handle = comp_ctx.get_port_handle(port_id);
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -186,48 +186,50 @@ impl Component for ComponentTcpClient {
 

	
 
                                        self.sync_state = SyncState::Putting;
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_receive_tag_value {
 
                                        // Component requires a `recv`
 
                                        self.sync_state = SyncState::Getting;
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_finish_tag_value {
 
                                        // Component requires us to end the sync round
 
                                        self.sync_state = SyncState::FinishSync;
 
                                        return CompScheduling::Immediate;
 
                                    } else if tag_value == self.input_union_shutdown_tag_value {
 
                                        // Component wants to close the connection
 
                                        self.sync_state = SyncState::FinishSyncThenQuit;
 
                                        return CompScheduling::Immediate;
 
                                    } else {
 
                                        unreachable!("got tag_value {}", tag_value)
 
                                    }
 
                                }
 
                            } else {
 
                                todo!("handle sync failure due to message deadlock");
 
                                return CompScheduling::Sleep;
 
                            }
 
                        } else {
 
                            let port_handle = comp_ctx.get_port_handle(self.pdl_input_port_id);
 
                            comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::NoSource;
 
                            self.exec_state.set_as_blocked_get(self.pdl_input_port_id);
 
                            return CompScheduling::Sleep;
 
                        }
 
                    },
 
                    SyncState::Putting => {
 
                        // We're supposed to send a user-supplied message fully
 
                        // over the socket. But we might end up blocking. In
 
                        // that case the component goes to sleep until it is
 
                        // polled.
 
                        let socket = self.socket_state.get_socket();
 
                        while !self.byte_buffer.is_empty() {
 
                            match socket.send(&self.byte_buffer) {
 
                                Ok(bytes_sent) => {
 
                                    self.byte_buffer.drain(..bytes_sent);
 
                                },
 
                                Err(err) => {
 
                                    if err.kind() == IoErrorKind::WouldBlock {
 
                                        return CompScheduling::Sleep; // wait until notified
 
                                    } else {
 
                                        todo!("handle socket.send error {:?}", err)
 
                                    }
 
                                }
 
                            }
 
                        }
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -297,48 +297,50 @@ impl Component for CompPDL {
 

	
 
        let run_result = self.execute_prompt(&sched_ctx);
 
        if let Err(error) = run_result {
 
            self.handle_component_error(sched_ctx, CompError::Executor(error));
 
            return CompScheduling::Immediate;
 
        }
 

	
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::BlockGet(expr_id, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                comp_ctx.get_port_mut(port_handle).last_instruction = PortInstruction::SourceLocation(expr_id);
 

	
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        let receive_result = component::default_handle_received_data_message(
 
                            port_id, PortInstruction::SourceLocation(expr_id),
 
                            &mut self.inbox_main[port_index], &mut self.inbox_backup,
 
                            comp_ctx, sched_ctx, &mut self.control
 
                        );
 
                        if let Err(location_and_message) = receive_result {
 
                            self.handle_generic_component_error(sched_ctx, location_and_message);
 
                            return CompScheduling::Immediate
 
                        } else {
 
                            self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                            return CompScheduling::Immediate;
 
                        }
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return CompScheduling::Sleep;
 
                    }
 
                } else {
 
@@ -557,49 +559,49 @@ impl CompPDL {
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
    }
 

	
 
    /// Handles an error coming from the generic `component::handle_xxx`
 
    /// functions. Hence accepts argument as a tuple.
 
    fn handle_generic_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
        // Retrieve location and message, display in terminal
 
        let (location, message) = location_and_message;
 
        let error = match location {
 
            PortInstruction::None => CompError::Component(message),
 
            PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location
 
            PortInstruction::SourceLocation(expression_id) => {
 
                let protocol = &sched_ctx.runtime.protocol;
 
                CompError::Executor(EvalError::new_error_at_expr(
 
                    &self.prompt, &protocol.modules, &protocol.heap,
 
                    expression_id, message
 
                ))
 
            }
 
        };
 

	
 
        self.handle_component_error(sched_Ctx, error);
 
        self.handle_component_error(sched_ctx, error);
 
    }
 

	
 
    fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, error: CompError) {
 
        sched_ctx.error(&format!("{}", error));
 

	
 
        // Set state to handle subsequent error
 
        let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
            ExitReason::ErrorInSync
 
        } else {
 
            ExitReason::ErrorNonSync
 
        };
 

	
 
        self.exec_state.set_as_start_exit(exit_reason);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Creates a new component and transfers ports. Because of the stepwise
 
    /// process in which memory is allocated, ports are transferred, messages
 
    /// are exchanged, component lifecycle methods are called, etc. This
 
    /// function facilitates a lot of implicit assumptions (e.g. when the
 
    /// `Component::on_creation` method is called, the component is already
src/runtime2/tests/error_handling.rs
Show inline comments
 
new file 100644
 
use super::*;
 

	
 
#[test]
 
fn test_unconnected_component_error() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive interact_with_noone() {
 
        u8[] array = { 5 };
 
        auto value = array[1];
 
    }
 
    ").unwrap();
 
    let rt = Runtime::new(1, true, pd).unwrap();
 
    create_component(&rt, "", "interact_with_noone", no_args());
 
}
 

	
 
#[test]
 
fn test_connected_uncommunicating_component_error() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive crashing_and_burning(out<u32> unused) {
 
        u8[] array = { 1337 };
 
        auto value = array[1337];
 
    }
 
    primitive sitting_idly_waiting(in<u32> never_providing) {
 
        sync auto a = get(never_providing);
 
    }
 
    composite constructor() {
 
        channel a -> b;
 
        new sitting_idly_waiting(b);
 
        new crashing_and_burning(a);
 
    }").unwrap();
 
    let rt = Runtime::new(1, true, pd).unwrap();
 
    create_component(&rt, "", "constructor", no_args());
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
use crate::protocol::*;
 
use crate::protocol::eval::*;
 
use crate::runtime2::runtime::*;
 
use crate::runtime2::component::{CompCtx, CompPDL};
 

	
 
fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
mod error_handling;
 

	
 
pub(crate) fn create_component(rt: &Runtime, module_name: &str, routine_name: &str, args: ValueGroup) {
 
    let prompt = rt.inner.protocol.new_component(
 
        module_name.as_bytes(), routine_name.as_bytes(), args
 
    ).expect("create prompt");
 
    let reserved = rt.inner.start_create_pdl_component();
 
    let ctx = CompCtx::new(&reserved);
 
    let component = Box::new(CompPDL::new(prompt, 0));
 
    let (key, _) = rt.inner.finish_create_pdl_component(reserved, component, ctx, false);
 
    rt.inner.enqueue_work(key);
 
}
 

	
 
fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 
pub(crate) fn no_args() -> ValueGroup { ValueGroup::new_stack(Vec::new()) }
 

	
 
#[test]
 
fn test_component_creation() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive nothing_at_all() {
 
        s32 a = 5;
 
        auto b = 5 + a;
 
    }
 
    ").expect("compilation");
 
    let rt = Runtime::new(1, true, pd).unwrap();
 

	
 
    for _i in 0..20 {
 
        create_component(&rt, "", "nothing_at_all", no_args());
 
    }
 
}
 

	
 
#[test]
 
fn test_component_communication() {
 
    let pd = ProtocolDescription::parse(b"
 
    primitive sender(out<u32> o, u32 outside_loops, u32 inside_loops) {
 
        u32 outside_index = 0;
 
        while (outside_index < outside_loops) {
 
            u32 inside_index = 0;
 
            sync while (inside_index < inside_loops) {
0 comments (0 inline, 0 general)