use rand::prelude as random; use rand::RngCore; use crate::protocol::eval::{ValueGroup, Value}; use crate::runtime2::*; use super::*; use super::component::{ self, Component, CompExecState, CompScheduling, CompMode, ExitReason }; use super::control_layer::*; use super::consensus::*; /// TODO: Temporary component to figure out what to do with custom components. /// This component sends random numbers between two u32 limits pub struct ComponentRandomU32 { // Properties for this specific component output_port_id: PortId, random_minimum: u32, random_maximum: u32, num_sends: u32, max_num_sends: u32, generator: random::ThreadRng, // Generic state-tracking exec_state: CompExecState, did_perform_send: bool, // when in sync mode control: ControlLayer, consensus: Consensus, } impl Component for ComponentRandomU32 { fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {} fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) {} fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { // Impossible since this component does not have any input ports in its // signature. unreachable!(); } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { match message { Message::Data(_message) => unreachable!(), Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus); }, Message::Control(message) => { if let Err(location_and_message) = component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ) { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); } }, Message::Poll => unreachable!(), } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling { sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { CompMode::BlockedGet | CompMode::BlockedSelect => { // impossible for this component, no input ports and no select // blocks unreachable!(); } CompMode::NonSync => { // If in non-sync mode then we check if the arguments make sense // (at some point in the future, this is just a testing // component). if self.random_minimum >= self.random_maximum { // Could throw an evaluation error, but lets just panic panic!("going to crash 'n burn your system now, please provide valid arguments"); } if self.num_sends >= self.max_num_sends { self.exec_state.set_as_start_exit(ExitReason::Termination); } else { sched_ctx.info("Entering sync mode"); self.did_perform_send = false; component::default_handle_sync_start( &mut self.exec_state, &mut [], sched_ctx, comp_ctx, &mut self.consensus ); } return CompScheduling::Immediate; }, CompMode::Sync => { // This component just sends a single message, then waits until // consensus has been reached if !self.did_perform_send { sched_ctx.info("Sending random message"); let mut random = self.generator.next_u32() - self.random_minimum; let random_delta = self.random_maximum - self.random_minimum; random %= random_delta; random += self.random_minimum; let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]); let send_result = component::default_send_data_message( &mut self.exec_state, self.output_port_id, PortInstruction::NoSource, value_group, sched_ctx, &mut self.consensus, comp_ctx ); if let Err(location_and_message) = send_result { component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message); return CompScheduling::Immediate } else { // Blocked or not, we set `did_perform_send` to true. If // blocked then the moment we become unblocked (and are back // at the `Sync` mode) we have sent the message. let scheduling = send_result.unwrap(); self.did_perform_send = true; self.num_sends += 1; return scheduling } } else { // Message was sent, finish this sync round component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus); return CompScheduling::Requeue; } }, CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep, CompMode::StartExit => return component::default_handle_start_exit( &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus ), CompMode::BusyExit => return component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx ), CompMode::Exit => return component::default_handle_exit(&self.exec_state), } } } impl ComponentRandomU32 { pub(crate) fn new(arguments: ValueGroup) -> Self { debug_assert_eq!(arguments.values.len(), 4); debug_assert!(arguments.regions.is_empty()); let port_id = component::port_id_from_eval(arguments.values[0].as_port_id()); let minimum = arguments.values[1].as_uint32(); let maximum = arguments.values[2].as_uint32(); let num_sends = arguments.values[3].as_uint32(); return Self{ output_port_id: port_id, random_minimum: minimum, random_maximum: maximum, num_sends: 0, max_num_sends: num_sends, generator: random::thread_rng(), exec_state: CompExecState::new(), did_perform_send: false, control: ControlLayer::default(), consensus: Consensus::new(), } } }