Changeset - 84e785174232
[Not reviewed]
0 1 0
mh - 3 years ago 2022-04-20 16:13:24
contact@maxhenger.nl
Removed some duplicated code in error handling
1 file changed with 9 insertions and 13 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -251,25 +251,25 @@ impl Component for CompPDL {
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                ) {
 
                    self.handle_component_error(sched_ctx, location_and_message);
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Poll => {
 
                unreachable!(); // because we never register at the polling thread
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
@@ -288,33 +288,25 @@ impl Component for CompPDL {
 
            }
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx);
 
        if let Err(error) = run_result {
 
            // TODO: Cleanup before @nocommit
 
            sched_ctx.error(&format!("{}", error));
 
            let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
                ExitReason::ErrorInSync
 
            } else {
 
                ExitReason::ErrorNonSync
 
            };
 

	
 
            self.exec_state.set_as_start_exit(exit_reason);
 
            self.handle_component_error(sched_ctx, CompError::Executor(error));
 
            return CompScheduling::Immediate;
 
        }
 

	
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
@@ -330,25 +322,25 @@ impl Component for CompPDL {
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        let receive_result = component::default_handle_received_data_message(
 
                            port_id, PortInstruction::SourceLocation(expr_id),
 
                            &mut self.inbox_main[port_index], &mut self.inbox_backup,
 
                            comp_ctx, sched_ctx, &mut self.control
 
                        );
 
                        if let Err(location_and_message) = receive_result {
 
                            self.handle_component_error(sched_ctx, location_and_message);
 
                            self.handle_generic_component_error(sched_ctx, location_and_message);
 
                            return CompScheduling::Immediate
 
                        } else {
 
                            self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                            return CompScheduling::Immediate;
 
                        }
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return CompScheduling::Sleep;
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.exec_state.set_as_blocked_get(port_id);
 
@@ -358,25 +350,25 @@ impl Component for CompPDL {
 
            EC::Put(expr_id, port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 

	
 
                // Send the message
 
                let target_port_id = port_id_from_eval(port_id);
 
                let send_result = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id,
 
                    PortInstruction::SourceLocation(expr_id), value,
 
                    sched_ctx, &mut self.consensus, comp_ctx
 
                );
 
                if let Err(location_and_message) = send_result {
 
                    self.handle_component_error(sched_ctx, location_and_message);
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // When `run` is called again (potentially after becoming
 
                    // unblocked) we need to instruct the executor that we performed
 
                    // the `put`
 
                    let scheduling = send_result.unwrap();
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return scheduling;
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
@@ -562,39 +554,43 @@ impl CompPDL {
 
                self.inbox_backup.push(message);
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
    }
 

	
 
    /// Handles an error coming from the generic `component::handle_xxx`
 
    /// functions. Hence accepts argument as a tuple.
 
    fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
    fn handle_generic_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
        // Retrieve location and message, display in terminal
 
        let (location, message) = location_and_message;
 
        let error = match location {
 
            PortInstruction::None => CompError::Component(message),
 
            PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location
 
            PortInstruction::SourceLocation(expression_id) => {
 
                let protocol = &sched_ctx.runtime.protocol;
 
                CompError::Executor(EvalError::new_error_at_expr(
 
                    &self.prompt, &protocol.modules, &protocol.heap,
 
                    expression_id, message
 
                ))
 
            }
 
        };
 

	
 
        self.handle_component_error(sched_Ctx, error);
 
    }
 

	
 
    fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, error: CompError) {
 
        sched_ctx.error(&format!("{}", error));
 

	
 
        // Set state to handle subsequent error
 
        let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
            ExitReason::ErrorInSync
 
        } else {
 
            ExitReason::ErrorNonSync
 
        };
 

	
 
        self.exec_state.set_as_start_exit(exit_reason);
 
    }
 

	
0 comments (0 inline, 0 general)