diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs index dce5d709a86cf6b0e73d63bef292113aa38f691c..dc8a63334b8e19a37604d51f35b79cd67734cf8d 100644 --- a/src/runtime2/component/component_random.rs +++ b/src/runtime2/component/component_random.rs @@ -1,11 +1,15 @@ use rand::prelude as random; use rand::RngCore; -use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::protocol::eval::{ValueGroup, Value}; use crate::runtime2::*; use super::*; -use super::component::{self, Component, CompExecState, CompScheduling, CompMode}; +use super::component::{ + self, + Component, CompExecState, CompScheduling, + CompMode, ExitReason +}; use super::control_layer::*; use super::consensus::*; @@ -42,19 +46,21 @@ impl Component for ComponentRandomU32 { Message::Data(_message) => unreachable!(), Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); }, Message::Control(message) => { - component::default_handle_control_message( + if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx - ); + ) { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + } }, Message::Poll => unreachable!(), } } - fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { @@ -73,7 +79,7 @@ impl Component for ComponentRandomU32 { } if self.num_sends >= self.max_num_sends { - self.exec_state.mode = CompMode::StartExit; + self.exec_state.set_as_start_exit(ExitReason::Termination); } else { sched_ctx.log("Entering sync mode"); self.did_perform_send = false; @@ -81,7 +87,7 @@ impl Component for ComponentRandomU32 { self.exec_state.mode = CompMode::Sync; } - return Ok(CompScheduling::Immediate); + return CompScheduling::Immediate; }, CompMode::Sync => { // This component just sends a single message, then waits until @@ -94,34 +100,41 @@ impl Component for ComponentRandomU32 { random += self.random_minimum; let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]); - let scheduling = component::default_send_data_message( - &mut self.exec_state, self.output_port_id, value_group, + let send_result = component::default_send_data_message( + &mut self.exec_state, self.output_port_id, + PortInstruction::NoSource, value_group, sched_ctx, &mut self.consensus, comp_ctx ); - // Blocked or not, we set `did_perform_send` to true. If - // blocked then the moment we become unblocked (and are back - // at the `Sync` mode) we have sent the message. - self.did_perform_send = true; - self.num_sends += 1; - return Ok(scheduling) + if let Err(location_and_message) = send_result { + component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); + return CompScheduling::Immediate + } else { + // Blocked or not, we set `did_perform_send` to true. If + // blocked then the moment we become unblocked (and are back + // at the `Sync` mode) we have sent the message. + let scheduling = send_result.unwrap(); + self.did_perform_send = true; + self.num_sends += 1; + return scheduling + } } else { // Message was sent, finish this sync round sched_ctx.log("Waiting for consensus"); self.exec_state.mode = CompMode::SyncEnd; - let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); - component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); - return Ok(CompScheduling::Requeue); + let decision = self.consensus.notify_sync_end_success(sched_ctx, comp_ctx); + component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, decision, &mut self.consensus); + return CompScheduling::Requeue; } }, - CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), - CompMode::StartExit => return Ok(component::default_handle_start_exit( - &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx - )), - CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep, + CompMode::StartExit => return component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus + ), + CompMode::BusyExit => return component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx - )), - CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), + ), + CompMode::Exit => return component::default_handle_exit(&self.exec_state), } } }