use rand::prelude as random; use rand::RngCore; use crate::protocol::eval::{ValueGroup, Value, EvalError}; use crate::runtime2::*; use super::*; use super::component::{self, Component, CompExecState, CompScheduling, CompMode}; use super::control_layer::*; use super::consensus::*; /// TODO: Temporary component to figure out what to do with custom components. /// This component sends random numbers between two u32 limits pub struct ComponentRandomU32 { // Properties for this specific component output_port_id: PortId, random_minimum: u32, random_maximum: u32, num_sends: u32, max_num_sends: u32, generator: random::ThreadRng, // Generic state-tracking exec_state: CompExecState, did_perform_send: bool, // when in sync mode control: ControlLayer, consensus: Consensus, } impl Component for ComponentRandomU32 { fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {} fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) {} fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { // Impossible since this component does not have any input ports in its // signature. unreachable!(); } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { match message { Message::Data(_message) => unreachable!(), Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); }, Message::Control(message) => { component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ); }, Message::Poll => unreachable!(), } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { CompMode::BlockedGet | CompMode::BlockedSelect => { // impossible for this component, no input ports and no select // blocks unreachable!(); } CompMode::NonSync => { // If in non-sync mode then we check if the arguments make sense // (at some point in the future, this is just a testing // component). if self.random_minimum >= self.random_maximum { // Could throw an evaluation error, but lets just panic panic!("going to crash 'n burn your system now, please provide valid arguments"); } if self.num_sends >= self.max_num_sends { self.exec_state.mode = CompMode::StartExit; } else { sched_ctx.log("Entering sync mode"); self.did_perform_send = false; self.consensus.notify_sync_start(comp_ctx); self.exec_state.mode = CompMode::Sync; } return Ok(CompScheduling::Immediate); }, CompMode::Sync => { // This component just sends a single message, then waits until // consensus has been reached if !self.did_perform_send { sched_ctx.log("Sending random message"); let mut random = self.generator.next_u32() - self.random_minimum; let random_delta = self.random_maximum - self.random_minimum; random %= random_delta; random += self.random_minimum; let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]); let scheduling = component::default_send_data_message( &mut self.exec_state, self.output_port_id, value_group, sched_ctx, &mut self.consensus, comp_ctx ); // Blocked or not, we set `did_perform_send` to true. If // blocked then the moment we become unblocked (and are back // at the `Sync` mode) we have sent the message. self.did_perform_send = true; self.num_sends += 1; return Ok(scheduling) } else { // Message was sent, finish this sync round sched_ctx.log("Waiting for consensus"); self.exec_state.mode = CompMode::SyncEnd; let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); return Ok(CompScheduling::Requeue); } }, CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), CompMode::StartExit => return Ok(component::default_handle_start_exit( &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx )), CompMode::BusyExit => return Ok(component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx )), CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), } } } impl ComponentRandomU32 { pub(crate) fn new(arguments: ValueGroup) -> Self { debug_assert_eq!(arguments.values.len(), 4); debug_assert!(arguments.regions.is_empty()); let port_id = component::port_id_from_eval(arguments.values[0].as_port_id()); let minimum = arguments.values[1].as_uint32(); let maximum = arguments.values[2].as_uint32(); let num_sends = arguments.values[3].as_uint32(); return Self{ output_port_id: port_id, random_minimum: minimum, random_maximum: maximum, num_sends: 0, max_num_sends: num_sends, generator: random::thread_rng(), exec_state: CompExecState::new(), did_perform_send: false, control: ControlLayer::default(), consensus: Consensus::new(), } } }