diff --git a/src/runtime2/component/component_pdl.rs b/src/runtime2/component/component_pdl.rs index dd3081ee1d12512335b33c706a690cfa21fc7612..46424fe798589d6830b7a9877dfb6cf8e960a2bb 100644 --- a/src/runtime2/component/component_pdl.rs +++ b/src/runtime2/component/component_pdl.rs @@ -13,7 +13,7 @@ use crate::runtime2::communication::*; use super::component::{ self, - CompExecState, Component, CompScheduling, CompMode, + CompExecState, Component, CompScheduling, CompError, CompMode, ExitReason, port_id_from_eval, port_id_to_eval }; use super::component_context::*; @@ -256,10 +256,12 @@ impl Component for CompPDL { self.handle_incoming_data_message(sched_ctx, comp_ctx, message); }, Message::Control(message) => { - component::default_handle_control_message( + if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx - ); + ) { + self.handle_component_error(sched_ctx, location_and_message); + } }, Message::Sync(message) => { self.handle_incoming_sync_message(sched_ctx, comp_ctx, message); @@ -270,7 +272,7 @@ impl Component for CompPDL { } } - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { use EvalContinuation as EC; sched_ctx.log(&format!("Running component (mode: {:?})", self.exec_state.mode)); @@ -282,18 +284,32 @@ impl Component for CompPDL { // continue and run PDL code }, CompMode::SyncEnd | CompMode::BlockedGet | CompMode::BlockedPut | CompMode::BlockedSelect => { - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } - CompMode::StartExit => return Ok(component::default_handle_start_exit( - &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx - )), - CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + CompMode::StartExit => return component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus + ), + CompMode::BusyExit => return component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx - )), - CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), + ), + CompMode::Exit => return component::default_handle_exit(&self.exec_state), + } + + let run_result = self.execute_prompt(&sched_ctx); + if let Err(error) = run_result { + // TODO: Cleanup before @nocommit + sched_ctx.error(&format!("{}", error)); + let exit_reason = if self.exec_state.mode.is_in_sync_block() { + ExitReason::ErrorInSync + } else { + ExitReason::ErrorNonSync + }; + + self.exec_state.set_as_start_exit(exit_reason); + return CompScheduling::Immediate; } - let run_result = self.execute_prompt(&sched_ctx)?; + let run_result = run_result.unwrap(); match run_result { EC::Stepping => unreachable!(), // execute_prompt runs until this is no longer returned @@ -302,9 +318,9 @@ impl Component for CompPDL { EC::SyncBlockEnd => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.handle_sync_end(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, - EC::BlockGet(port_id) => { + EC::BlockGet(expr_id, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); debug_assert!(self.exec_ctx.stmt.is_none()); @@ -317,44 +333,55 @@ impl Component for CompPDL { // Message was received. Make sure any blocked peers and // pending messages are handled. let message = self.inbox_main[port_index].take().unwrap(); - component::default_handle_received_data_message( - port_id, &mut self.inbox_main[port_index], &mut self.inbox_backup, + let receive_result = component::default_handle_received_data_message( + port_id, PortInstruction::SourceLocation(expr_id), + &mut self.inbox_main[port_index], &mut self.inbox_backup, comp_ctx, sched_ctx, &mut self.control ); - - self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); - return Ok(CompScheduling::Immediate); + if let Err(location_and_message) = receive_result { + self.handle_component_error(sched_ctx, location_and_message); + return CompScheduling::Immediate + } else { + self.exec_ctx.stmt = ExecStmt::PerformedGet(message.content); + return CompScheduling::Immediate; + } } else { todo!("handle sync failure due to message deadlock"); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } } else { // We need to wait self.exec_state.set_as_blocked_get(port_id); - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } }, - EC::Put(port_id, value) => { + EC::Put(expr_id, port_id, value) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); sched_ctx.log(&format!("Putting value {:?}", value)); // Send the message let target_port_id = port_id_from_eval(port_id); - let scheduling = component::default_send_data_message( - &mut self.exec_state, target_port_id, value, + let send_result = component::default_send_data_message( + &mut self.exec_state, target_port_id, + PortInstruction::SourceLocation(expr_id), value, sched_ctx, &mut self.consensus, comp_ctx ); - - // When `run` is called again (potentially after becoming - // unblocked) we need to instruct the executor that we performed - // the `put` - self.exec_ctx.stmt = ExecStmt::PerformedPut; - return Ok(scheduling); + if let Err(location_and_message) = send_result { + self.handle_component_error(sched_ctx, location_and_message); + return CompScheduling::Immediate; + } else { + // When `run` is called again (potentially after becoming + // unblocked) we need to instruct the executor that we performed + // the `put` + let scheduling = send_result.unwrap(); + self.exec_ctx.stmt = ExecStmt::PerformedPut; + return scheduling; + } }, EC::SelectStart(num_cases, _num_ports) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); self.select_state.handle_select_start(num_cases); - return Ok(CompScheduling::Requeue); + return CompScheduling::Requeue; }, EC::SelectRegisterPort(case_index, port_index, port_id) => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); @@ -362,7 +389,7 @@ impl Component for CompPDL { if let Err(_err) = self.select_state.register_select_case_port(comp_ctx, case_index, port_index, port_id) { todo!("handle registering a port multiple times"); } - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, EC::SelectWait => { debug_assert_eq!(self.exec_state.mode, CompMode::Sync); @@ -371,22 +398,22 @@ impl Component for CompPDL { // Reached a conclusion, so we can continue immediately self.exec_ctx.stmt = ExecStmt::PerformedSelectWait(case_index); self.exec_state.mode = CompMode::Sync; - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; } else { // No decision yet self.exec_state.mode = CompMode::BlockedSelect; - return Ok(CompScheduling::Sleep); + return CompScheduling::Sleep; } }, // Results that can be returned outside of sync mode EC::ComponentTerminated => { - self.exec_state.mode = CompMode::StartExit; // next call we'll take care of the exit - return Ok(CompScheduling::Immediate); + self.exec_state.set_as_start_exit(ExitReason::Termination); + return CompScheduling::Immediate; }, EC::SyncBlockStart => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); self.handle_sync_start(sched_ctx, comp_ctx); - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, EC::NewComponent(definition_id, type_id, arguments) => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); @@ -394,7 +421,7 @@ impl Component for CompPDL { sched_ctx, comp_ctx, definition_id, type_id, arguments ); - return Ok(CompScheduling::Requeue); + return CompScheduling::Requeue; }, EC::NewChannel => { debug_assert_eq!(self.exec_state.mode, CompMode::NonSync); @@ -406,7 +433,7 @@ impl Component for CompPDL { )); self.inbox_main.push(None); self.inbox_main.push(None); - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; } } } @@ -469,37 +496,16 @@ impl CompPDL { /// immediately fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { sched_ctx.log("Component ending sync mode (now waiting for solution)"); - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); self.exec_state.mode = CompMode::SyncEnd; - self.handle_sync_decision(sched_ctx, comp_ctx, decision); - } - - /// Handles decision from the consensus round. This will cause a change in - /// the internal `Mode`, such that the next call to `run` can take the - /// appropriate next steps. - fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { - sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.exec_state.mode)); - match decision { - SyncRoundDecision::None => { - // No decision yet - return; - }, - SyncRoundDecision::Solution => { - debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); - self.exec_state.mode = CompMode::NonSync; - self.consensus.notify_sync_decision(decision); - }, - SyncRoundDecision::Failure => { - debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); - self.exec_state.mode = CompMode::StartExit; - }, - } + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); } fn handle_component_exit(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) { - sched_ctx.log("Component exiting"); + sched_ctx.log(&format!("Component exiting (reason: {:?}", self.exec_state.exit_reason)); debug_assert_eq!(self.exec_state.mode, CompMode::StartExit); self.exec_state.mode = CompMode::BusyExit; + let exit_inside_sync = self.exec_state.exit_reason.is_in_sync(); // Doing this by index, then retrieving the handle is a bit rediculous, // but Rust is being Rust with its borrowing rules. @@ -516,7 +522,7 @@ impl CompPDL { // Notify peer of closing let port_handle = comp_ctx.get_port_handle(port_id); - let (peer, message) = self.control.initiate_port_closing(port_handle, comp_ctx); + let (peer, message) = self.control.initiate_port_closing(port_handle, exit_inside_sync, comp_ctx); let peer_info = comp_ctx.get_peer(peer); peer_info.handle.send_message(&sched_ctx.runtime, Message::Control(message), true); } @@ -560,7 +566,36 @@ impl CompPDL { fn handle_incoming_sync_message(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx, message: SyncMessage) { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - self.handle_sync_decision(sched_ctx, comp_ctx, decision); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + } + + /// Handles an error coming from the generic `component::handle_xxx` + /// functions. Hence accepts argument as a tuple. + fn handle_component_error(&mut self, sched_ctx: &SchedulerCtx, location_and_message: (PortInstruction, String)) { + // Retrieve location and message, display in terminal + let (location, message) = location_and_message; + let error = match location { + PortInstruction::None => CompError::Component(message), + PortInstruction::NoSource => unreachable!(), // for debugging: all in-sync errors are associated with a source location + PortInstruction::SourceLocation(expression_id) => { + let protocol = &sched_ctx.runtime.protocol; + CompError::Executor(EvalError::new_error_at_expr( + &self.prompt, &protocol.modules, &protocol.heap, + expression_id, message + )) + } + }; + + sched_ctx.error(&format!("{}", error)); + + // Set state to handle subsequent error + let exit_reason = if self.exec_state.mode.is_in_sync_block() { + ExitReason::ErrorInSync + } else { + ExitReason::ErrorNonSync + }; + + self.exec_state.set_as_start_exit(exit_reason); } // -------------------------------------------------------------------------