Changeset - 84e785174232
[Not reviewed]
0 1 0
mh - 3 years ago 2022-04-20 16:13:24
contact@maxhenger.nl
Removed some duplicated code in error handling
1 file changed with 9 insertions and 13 deletions:
0 comments (0 inline, 0 general)
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -215,204 +215,196 @@ pub(crate) struct CompPDL {
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 

	
 
impl Component for CompPDL {
 
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {
 
        // Intentionally empty
 
    }
 

	
 
    fn on_shutdown(&mut self, _sched_ctx: &SchedulerCtx) {
 
        // Intentionally empty
 
    }
 

	
 
    fn adopt_message(&mut self, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        if self.inbox_main[port_index].is_none() {
 
            self.inbox_main[port_index] = Some(message);
 
        } else {
 
            self.inbox_backup.push(message);
 
        }
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, mut message: Message) {
 
        // sched_ctx.log(&format!("handling message: {:?}", message));
 
        if let Some(new_target) = self.control.should_reroute(&mut message) {
 
            let mut target = sched_ctx.runtime.get_component_public(new_target); // TODO: @NoDirectHandle
 
            target.send_message(&sched_ctx.runtime, message, false); // not waking up: we schedule once we've received all PortPeerChanged Acks
 
            let _should_remove = target.decrement_users();
 
            debug_assert!(_should_remove.is_none());
 
            return;
 
        }
 

	
 
        match message {
 
            Message::Data(message) => {
 
                self.handle_incoming_data_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Control(message) => {
 
                if let Err(location_and_message) = component::default_handle_control_message(
 
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
 
                    message, sched_ctx, comp_ctx
 
                ) {
 
                    self.handle_component_error(sched_ctx, location_and_message);
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                }
 
            },
 
            Message::Sync(message) => {
 
                self.handle_incoming_sync_message(sched_ctx, comp_ctx, message);
 
            },
 
            Message::Poll => {
 
                unreachable!(); // because we never register at the polling thread
 
            }
 
        }
 
    }
 

	
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
 
        use EvalContinuation as EC;
 

	
 
        sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode));
 

	
 
        // Depending on the mode don't do anything at all, take some special
 
        // actions, or fall through and run the PDL code.
 
        match self.exec_state.mode {
 
            CompMode::NonSync | CompMode::Sync => {
 
                // continue and run PDL code
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => {
 
                return CompScheduling::Sleep;
 
            }
 
            CompMode::StartExit => return component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
 
            ),
 
            CompMode::BusyExit => return component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            ),
 
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
 
        }
 

	
 
        let run_result = self.execute_prompt(&sched_ctx);
 
        if let Err(error) = run_result {
 
            // TODO: Cleanup before @nocommit
 
            sched_ctx.error(&format!("{}", error));
 
            let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
                ExitReason::ErrorInSync
 
            } else {
 
                ExitReason::ErrorNonSync
 
            };
 

	
 
            self.exec_state.set_as_start_exit(exit_reason);
 
            self.handle_component_error(sched_ctx, CompError::Executor(error));
 
            return CompScheduling::Immediate;
 
        }
 

	
 
        let run_result = run_result.unwrap();
 

	
 
        match run_result {
 
            EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned
 
            EC::BranchInconsistent | EC::NewFork | EC::BlockFires(_) => todo!("remove these"),
 
            // Results that can be returned in sync mode
 
            EC::SyncBlockEnd => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.handle_sync_end(sched_ctx, comp_ctx);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::BlockGet(expr_id, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                debug_assert!(self.exec_ctx.stmt.is_none());
 

	
 
                let port_id = port_id_from_eval(port_id);
 
                let port_handle = comp_ctx.get_port_handle(port_id);
 
                let port_index = comp_ctx.get_port_index(port_handle);
 
                if let Some(message) = &self.inbox_main[port_index] {
 
                    // Check if we can actually receive the message
 
                    if self.consensus.try_receive_data_message(sched_ctx, comp_ctx, message) {
 
                        // Message was received. Make sure any blocked peers and
 
                        // pending messages are handled.
 
                        let message = self.inbox_main[port_index].take().unwrap();
 
                        let receive_result = component::default_handle_received_data_message(
 
                            port_id, PortInstruction::SourceLocation(expr_id),
 
                            &mut self.inbox_main[port_index], &mut self.inbox_backup,
 
                            comp_ctx, sched_ctx, &mut self.control
 
                        );
 
                        if let Err(location_and_message) = receive_result {
 
                            self.handle_component_error(sched_ctx, location_and_message);
 
                            self.handle_generic_component_error(sched_ctx, location_and_message);
 
                            return CompScheduling::Immediate
 
                        } else {
 
                            self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content);
 
                            return CompScheduling::Immediate;
 
                        }
 
                    } else {
 
                        todo!("handle sync failure due to message deadlock");
 
                        return CompScheduling::Sleep;
 
                    }
 
                } else {
 
                    // We need to wait
 
                    self.exec_state.set_as_blocked_get(port_id);
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            EC::Put(expr_id, port_id, value) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                sched_ctx.log(&format!("Putting value {:?}", value));
 

	
 
                // Send the message
 
                let target_port_id = port_id_from_eval(port_id);
 
                let send_result = component::default_send_data_message(
 
                    &mut self.exec_state, target_port_id,
 
                    PortInstruction::SourceLocation(expr_id), value,
 
                    sched_ctx, &mut self.consensus, comp_ctx
 
                );
 
                if let Err(location_and_message) = send_result {
 
                    self.handle_component_error(sched_ctx, location_and_message);
 
                    self.handle_generic_component_error(sched_ctx, location_and_message);
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // When `run` is called again (potentially after becoming
 
                    // unblocked) we need to instruct the executor that we performed
 
                    // the `put`
 
                    let scheduling = send_result.unwrap();
 
                    self.exec_ctx.stmt = ExecStmt::PerformedPut;
 
                    return scheduling;
 
                }
 
            },
 
            EC::SelectStart(num_cases, _num_ports) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                self.select_state.handle_select_start(num_cases);
 
                return CompScheduling::Requeue;
 
            },
 
            EC::SelectRegisterPort(case_index, port_index, port_id) => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let port_id = port_id_from_eval(port_id);
 
                if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) {
 
                    todo!("handle registering a port multiple times");
 
                }
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SelectWait => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::Sync);
 
                let select_decision = self.select_state.handle_select_waiting_point(&self.inbox_main, comp_ctx);
 
                if let SelectDecision::Case(case_index) = select_decision {
 
                    // Reached a conclusion, so we can continue immediately
 
                    self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                    self.exec_state.mode = CompMode::Sync;
 
                    return CompScheduling::Immediate;
 
                } else {
 
                    // No decision yet
 
                    self.exec_state.mode = CompMode::BlockedSelect;
 
                    return CompScheduling::Sleep;
 
                }
 
            },
 
            // Results that can be returned outside of sync mode
 
            EC::ComponentTerminated => {
 
                self.exec_state.set_as_start_exit(ExitReason::Termination);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::SyncBlockStart => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::NonSync);
 
                self.handle_sync_start(sched_ctx, comp_ctx);
 
                return CompScheduling::Immediate;
 
            },
 
            EC::NewComponent(definition_id, type_id, arguments) => {
 
@@ -526,111 +518,115 @@ impl CompPDL {
 
            let peer_info = comp_ctx.get_peer(peer);
 
            peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true);
 
        }
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling messages
 
    // -------------------------------------------------------------------------
 

	
 
    /// Handles a message that came in through the public inbox. This function
 
    /// will handle putting it in the correct place, and potentially blocking
 
    /// the port in case too many messages are being received.
 
    fn handle_incoming_data_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: DataMessage) {
 
        use component::IncomingData;
 

	
 
        // Whatever we do, glean information from headers in message
 
        if self.exec_state.mode.is_in_sync_block() {
 
            self.consensus.handle_incoming_data_message(comp_ctx, &message);
 
        }
 

	
 
        let port_handle = comp_ctx.get_port_handle(message.data_header.target_port);
 
        let port_index = comp_ctx.get_port_index(port_handle);
 
        match component::default_handle_incoming_data_message(
 
            &mut self.exec_state, &mut self.inbox_main[port_index], comp_ctx, message,
 
            sched_ctx, &mut self.control
 
        ) {
 
            IncomingData::PlacedInSlot => {
 
                if self.exec_state.mode == CompMode::BlockedSelect {
 
                    let select_decision = self.select_state.handle_updated_inbox(&self.inbox_main, comp_ctx);
 
                    if let SelectDecision::Case(case_index) = select_decision {
 
                        self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index);
 
                        self.exec_state.mode = CompMode::Sync;
 
                    }
 
                }
 
            },
 
            IncomingData::SlotFull(message) => {
 
                self.inbox_backup.push(message);
 
            }
 
        }
 
    }
 

	
 
    fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) {
 
        let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
 
        component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus);
 
    }
 

	
 
    /// Handles an error coming from the generic `component::handle_xxx`
 
    /// functions. Hence accepts argument as a tuple.
 
    fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
    fn handle_generic_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) {
 
        // Retrieve location and message, display in terminal
 
        let (location, message) = location_and_message;
 
        let error = match location {
 
            PortInstruction::None => CompError::Component(message),
 
            PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location
 
            PortInstruction::SourceLocation(expression_id) => {
 
                let protocol = &sched_ctx.runtime.protocol;
 
                CompError::Executor(EvalError::new_error_at_expr(
 
                    &self.prompt, &protocol.modules, &protocol.heap,
 
                    expression_id, message
 
                ))
 
            }
 
        };
 

	
 
        self.handle_component_error(sched_Ctx, error);
 
    }
 

	
 
    fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, error: CompError) {
 
        sched_ctx.error(&format!("{}", error));
 

	
 
        // Set state to handle subsequent error
 
        let exit_reason = if self.exec_state.mode.is_in_sync_block() {
 
            ExitReason::ErrorInSync
 
        } else {
 
            ExitReason::ErrorNonSync
 
        };
 

	
 
        self.exec_state.set_as_start_exit(exit_reason);
 
    }
 

	
 
    // -------------------------------------------------------------------------
 
    // Handling ports
 
    // -------------------------------------------------------------------------
 

	
 
    /// Creates a new component and transfers ports. Because of the stepwise
 
    /// process in which memory is allocated, ports are transferred, messages
 
    /// are exchanged, component lifecycle methods are called, etc. This
 
    /// function facilitates a lot of implicit assumptions (e.g. when the
 
    /// `Component::on_creation` method is called, the component is already
 
    /// registered at the runtime).
 
    fn create_component_and_transfer_ports(
 
        &mut self,
 
        sched_ctx: &SchedulerCtx, creator_ctx: &mut CompCtx,
 
        definition_id: ProcedureDefinitionId, type_id: TypeId, mut arguments: ValueGroup
 
    ) {
 
        struct PortPair{
 
            creator_handle: LocalPortHandle,
 
            creator_id: PortId,
 
            created_handle: LocalPortHandle,
 
            created_id: PortId,
 
        }
 
        let mut opened_port_id_pairs = Vec::new();
 
        let mut closed_port_id_pairs = Vec::new();
 

	
 
        let reservation = sched_ctx.runtime.start_create_pdl_component();
 
        let mut created_ctx = CompCtx::new(&reservation);
 

	
 
        let other_proc = &sched_ctx.runtime.protocol.heap[definition_id];
 
        let self_proc = &sched_ctx.runtime.protocol.heap[self.prompt.frames[0].definition];
 

	
 
        // dbg_code!({
 
        //     sched_ctx.log(&format!(
 
        //         "DEBUG: Comp '{}' (ID {:?}) is creating comp '{}' (ID {:?})",
 
        //         self_proc.identifier.value.as_str(), creator_ctx.id,
 
        //         other_proc.identifier.value.as_str(), reservation.id()
 
        //     ));
0 comments (0 inline, 0 general)