Changeset - 8da52bdbcaa7
[Not reviewed]
0 7 0
MH - 3 years ago 2022-04-22 11:32:53
contact@maxhenger.nl
Initial extension of error-handling to detect pre-sync errors
7 files changed with 67 insertions and 30 deletions:
0 comments (0 inline, 0 general)
src/protocol/eval/executor.rs
Show inline comments
 
@@ -725,13 +725,13 @@ impl Prompt {
 
                                    }
 
                                },
 
                                Method::Print => {
 
                                    // Convert the runtime-variant of a string
 
                                    // into an actual string.
 
                                    let value = cur_frame.expr_values.pop_front().unwrap();
 
                                    let mut is_literal_string = value.get_heap_pos().is_some();
 
                                    let is_literal_string = value.get_heap_pos().is_some();
 
                                    let value = self.store.maybe_read_ref(&value);
 
                                    let value_heap_pos = value.as_string();
 
                                    let elements = &self.store.heap_regions[value_heap_pos as usize].values;
 

	
 
                                    let mut message = String::with_capacity(elements.len());
 
                                    for element in elements {
src/protocol/parser/pass_rewriting.rs
Show inline comments
 
@@ -229,16 +229,18 @@ impl Visitor for PassRewriting {
 
            let mut total_port_index = 0;
 
            for case_index in 0..total_num_cases {
 
                let case = &ctx.heap[id].cases[case_index];
 
                let case_num_ports = case.involved_ports.len();
 

	
 
                for case_port_index in 0..case_num_ports {
 
                    // Arguments to runtime call
 
                    let original_get_call_id = case.involved_ports[case_port_index].0;
 
                    // Retrieve original span in case of error reporting for the
 
                    // inserted function call
 
                    let original_get_call_id = ctx.heap[id].cases[case_index].involved_ports[case_port_index].0;
 
                    let original_get_call_span = ctx.heap[original_get_call_id].full_span;
 

	
 
                    // Arguments to runtime call
 
                    let (port_variable_id, port_variable_type) = locals[total_port_index]; // so far this variable contains the temporary variables for the port expressions
 
                    let case_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_index as u64, ctx.arch.uint32_type_id);
 
                    let port_index_expr_id = create_ast_literal_integer_expr(ctx, self.current_procedure_id, case_port_index as u64, ctx.arch.uint32_type_id);
 
                    let port_variable_expr_id = create_ast_variable_expr(ctx, self.current_procedure_id, port_variable_id, port_variable_type);
 
                    let runtime_call_arguments = vec![
 
                        case_index_expr_id.upcast(),
src/runtime2/component/component.rs
Show inline comments
 
@@ -64,13 +64,15 @@ pub(crate) trait Component {
 

	
 
    /// Called if the component's routine should be executed. The return value
 
    /// can be used to indicate when the routine should be run again.
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling;
 
}
 

	
 
/// Representation of the generic operating mode of a component.
 
/// Representation of the generic operating mode of a component. Although not
 
/// every state may be used by every kind of (builtin) component, this allows
 
/// writing standard handlers for particular events in a component's lifetime.
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub(crate) enum CompMode {
 
    NonSync, // not in sync mode
 
    Sync, // in sync mode, can interact with other components
 
    SyncEnd, // awaiting a solution, i.e. encountered the end of the sync block
 
    BlockedGet, // blocked because we need to receive a message on a particular port
 
@@ -176,12 +178,20 @@ impl CompExecState {
 
        return
 
            self.mode == CompMode::BlockedPut &&
 
            self.mode_port == port;
 
    }
 
}
 

	
 
/// Generic representation of a component's inbox.
 
// NOTE: It would be nice to remove the `backup` at some point and make the
 
// blocking occur the moment a peer tries to send a message.
 
pub(crate) struct CompInbox {
 
    main: Vec<Option<DataMessage>>,
 
    backup: Vec<DataMessage>,
 
}
 

	
 
/// Creates a new component based on its definition. Meaning that if it is a
 
/// user-defined component then we set up the PDL code state. Otherwise we
 
/// construct a custom component. This does NOT take care of port and message
 
/// management.
 
pub(crate) fn create_component(
 
    protocol: &ProtocolDescription,
 
@@ -406,34 +416,44 @@ pub(crate) fn default_handle_control_message(
 

	
 
            // One exception to sending an `Ack` is if we just closed the
 
            // port ourselves, meaning that the `ClosePort` messages got
 
            // sent to one another.
 
            if let Some(control_id) = control.has_close_port_entry(port_handle, comp_ctx) {
 
                // The two components (sender and this component) are closing
 
                // the channel at the same time.
 
                // the channel at the same time. So we don't care about the
 
                // content of the `ClosePort` message.
 
                default_handle_ack(control, control_id, sched_ctx, comp_ctx);
 
            } else {
 
                // Respond to the message
 
                let last_instruction = port_info.last_instruction;
 
                let port_was_used = last_instruction != PortInstruction::None;
 
                let port_has_had_message = port_info.received_message_for_sync;
 
                default_send_ack(message.id, peer_handle, sched_ctx, comp_ctx);
 
                comp_ctx.remove_peer(sched_ctx, port_handle, peer_comp_id, false); // do not remove if closed
 
                comp_ctx.set_port_state(port_handle, PortState::Closed); // now set to closed
 

	
 
                // Make sure that we've not reached an error condition. Note
 
                // that if this condition is not met, then we don't error out
 
                // now, but we may error out in the next sync block when we
 
                // try to `put`/`get` on the port. This condition makes sure
 
                // that if we have a successful sync round, followed by the peer
 
                // closing the port, that we don't consider the sync round to
 
                // have failed by mistake.
 
                if content.closed_in_sync_round && exec_state.mode.is_in_sync_block() && port_was_used {
 
                    return Err((
 
                        last_instruction,
 
                        format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0)
 
                    ));
 

	
 
                // Handle any possible error conditions (which boil down to: the
 
                // port has been used, but the peer has died). If not in sync
 
                // mode then we close the port immediately.
 

	
 
                // Note that `port_was_used` does not mean that any messages
 
                // were actually received. It might also mean that e.g. the
 
                // component attempted a `get`, but there were no messages, so
 
                // now it is in the `BlockedGet` state.
 
                let port_was_used = last_instruction != PortInstruction::None;
 

	
 
                if exec_state.mode.is_in_sync_block() {
 
                    let closed_during_sync_round = content.closed_in_sync_round && port_was_used;
 
                    // TODO: Finish this
 

	
 
                    if closed_during_sync_round {
 
                        return Err((
 
                            last_instruction,
 
                            format!("Peer component (id:{}) shut down, so previous communication cannot have succeeded", peer_comp_id.0)
 
                        ));
 
                    }
 
                } else {
 
                    comp_ctx.set_port_state(port_handle, PortState::Closed);
 
                }
 
            }
 
        },
 
        ControlMessageContent::UnblockPort(port_id) => {
 
            // We were previously blocked (or already closed)
 
            let port_handle = comp_ctx.get_port_handle(port_id);
 
@@ -473,12 +493,18 @@ pub(crate) fn default_handle_control_message(
 
        }
 
    }
 

	
 
    return Ok(());
 
}
 

	
 
/// Handles a component entering the synchronous block. Will ensure that the
 
/// `Consensus` and the `ComponentCtx` are initialized properly.
 
pub(crate) fn default_handle_start_sync(
 
    exec_state: &mut CompExecState,
 
) {}
 

	
 
/// Handles a component initiating the exiting procedure, and closing all of its
 
/// ports. Should only be called once per component (which is ensured by
 
/// checking and modifying the mode in the execution state).
 
#[must_use]
 
pub(crate) fn default_handle_start_exit(
 
    exec_state: &mut CompExecState, control: &mut ControlLayer,
 
@@ -490,13 +516,13 @@ pub(crate) fn default_handle_start_exit(
 
    let exit_inside_sync = exec_state.exit_reason.is_in_sync();
 

	
 
    // If exiting while inside sync mode, report to the leader of the current
 
    // round that we've failed.
 
    if exit_inside_sync {
 
        let decision = consensus.notify_sync_end_failure(sched_ctx, comp_ctx);
 
        default_handle_sync_decision(sched_ctx, exec_state, decision, consensus);
 
        default_handle_sync_decision(sched_ctx, exec_state, comp_ctx, decision, consensus);
 
    }
 

	
 
    // Iterating over ports by index to work around borrowing rules
 
    for port_index in 0..comp_ctx.num_ports() {
 
        let port = comp_ctx.get_port_by_index_mut(port_index);
 
        if port.state == PortState::Closed {
src/runtime2/component/component_context.rs
Show inline comments
 
@@ -11,19 +11,25 @@ pub enum PortInstruction {
 
    NoSource,
 
    SourceLocation(ExpressionId),
 
}
 

	
 
#[derive(Debug)]
 
pub struct Port {
 
    // Identifiers
 
    pub self_id: PortId,
 
    pub peer_comp_id: CompId, // eventually consistent
 
    pub peer_port_id: PortId, // eventually consistent
 
    // Generic operating state
 
    pub kind: PortKind,
 
    pub state: PortState,
 
    pub last_instruction: PortInstruction, // used during sync round in case port ends up being closed for error reporting
 
    pub close_at_sync_end: bool, // used during sync round when receiving a `ClosePort(not_in_sync_round)` message
 
    // State tracking for error detection and error handling
 
    pub last_instruction: PortInstruction, // used during sync round to detect port-closed-during-sync errors
 
    pub received_message_for_sync: bool, // used during sync round to detect port-closed-before-sync errors
 
    pub close_at_sync_end: bool, // set during sync round when receiving a port-closed-after-sync message
 
    // Debugging flag to make sure each port is properly associated and
 
    // disassociated with a peer component
 
    #[cfg(debug_assertions)] pub(crate) associated_with_peer: bool,
 
}
 

	
 
pub struct Peer {
 
    pub id: CompId,
 
    pub num_associated_ports: u32,
 
@@ -69,22 +75,24 @@ impl CompCtx {
 
            peer_port_id: getter_id,
 
            kind: PortKind::Putter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        self.ports.push(Port{
 
            self_id: getter_id,
 
            peer_port_id: putter_id,
 
            kind: PortKind::Getter,
 
            state: PortState::Open,
 
            peer_comp_id: self.id,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 

	
 
        return Channel{ putter_id, getter_id };
 
    }
 

	
 
@@ -92,12 +100,13 @@ impl CompCtx {
 
    pub(crate) fn add_port(&mut self, peer_comp_id: CompId, peer_port_id: PortId, kind: PortKind, state: PortState) -> LocalPortHandle {
 
        let self_id = PortId(self.take_port_id());
 
        self.ports.push(Port{
 
            self_id, peer_comp_id, peer_port_id, kind, state,
 
            last_instruction: PortInstruction::None,
 
            close_at_sync_end: false,
 
            received_message_for_sync: false,
 
            #[cfg(debug_assertions)] associated_with_peer: false,
 
        });
 
        return LocalPortHandle(self_id);
 
    }
 

	
 
    /// Removes a port. Make sure you called `remove_peer` first.
src/runtime2/component/component_internet.rs
Show inline comments
 
@@ -101,13 +101,13 @@ impl Component for ComponentTcpClient {
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                ) {
 
@@ -235,13 +235,13 @@ impl Component for ComponentTcpClient {
 
                        }
 

	
 
                        // If here then we're done putting the data, we can
 
                        // finish the sync round
 
                        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
                        return CompScheduling::Immediate;
 
                    },
 
                    SyncState::Getting => {
 
                        // We're going to try and receive a single message. If
 
                        // this causes us to end up blocking the component
 
                        // goes to sleep until it is polled.
 
@@ -272,13 +272,13 @@ impl Component for ComponentTcpClient {
 
                            }
 
                        }
 
                    },
 
                    SyncState::FinishSync | SyncState::FinishSyncThenQuit => {
 
                        let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                        self.exec_state.mode = CompMode::SyncEnd;
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
                        return CompScheduling::Requeue;
 
                    },
 
                }
 
            },
 
            CompMode::BlockedGet => {
 
                // Entered when awaiting a new command
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -344,13 +344,13 @@ impl Component for CompPDL {
 
                        }
 
                    } else {
 
                        let protocol = &sched_ctx.runtime.protocol;
 
                        self.handle_component_error(sched_ctx, CompError::Executor(EvalError::new_error_at_expr(
 
                            &self.prompt, &protocol.modules, &protocol.heap, expr_id,
 
                            String::from("Cannot get from this port, as this causes a deadlock. This happens if you `get` in a different order as another component `put`s")
 
                        ));
 
                        )));
 
                        return CompScheduling::Sleep;
 
                    }
 
                } else if port_is_closed {
 
                    // No messages, but getting makes no sense as the port is
 
                    // closed.
 
                    let peer_id = comp_ctx.get_port(port_handle).peer_comp_id;
 
@@ -598,13 +598,13 @@ impl CompPDL {
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
    }
 

	
 
    /// Handles an error coming from the generic `component::handle_xxx`
 
    /// functions. Hence accepts argument as a tuple.
 
    fn handle_generic_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
        // Retrieve location and message, display in terminal
src/runtime2/component/component_random.rs
Show inline comments
 
@@ -43,13 +43,13 @@ impl Component for ComponentRandomU32 {
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
        match message {
 
            Message::Data(_message) => unreachable!(),
 
            Message::Sync(message) => {
 
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                ) {
 
@@ -120,13 +120,13 @@ impl Component for ComponentRandomU32 {
 
                    }
 
                } else {
 
                    // Message was sent, finish this sync round
 
                    sched_ctx.log("Waiting for consensus");
 
                    self.exec_state.mode = CompMode::SyncEnd;
 
                    let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx);
 
                    component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
                    component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
 
                    return CompScheduling::Requeue;
 
                }
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep,
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
0 comments (0 inline, 0 general)