use rand::prelude as random; use rand::RngCore; use crate::protocol::eval::{ValueGroup, Value, EvalError}; use crate::runtime2::*; use super::*; use super::component::{self, Component, CompExecState, CompScheduling, CompMode}; use super::control_layer::*; use super::consensus::*; /// TODO: Temporary component to figure out what to do with custom components. /// This component sends random numbers between two u32 limits pub struct ComponentRandomU32 { // Properties for this specific component output_port_id: PortId, random_minimum: u32, random_maximum: u32, num_sends: u32, max_num_sends: u32, generator: random::ThreadRng, // Generic state-tracking exec_state: CompExecState, did_perform_send: bool, // when in sync mode control: ControlLayer, consensus: Consensus, } impl Component for ComponentRandomU32 { fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { // Impossible since this component does not have any input ports in its // signature. unreachable!(); } fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { match message { Message::Data(_message) => unreachable!(), Message::Sync(message) => { let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); self.handle_sync_decision(sched_ctx, comp_ctx, decision); }, Message::Control(message) => { component::default_handle_control_message( &mut self.exec_state, &mut self.control, &mut self.consensus, message, sched_ctx, comp_ctx ); } } } fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); match self.exec_state.mode { CompMode::BlockedGet | CompMode::BlockedSelect => { // impossible for this component, no input ports and no select // blocks unreachable!(); } CompMode::NonSync => { // If in non-sync mode then we check if the arguments make sense // (at some point in the future, this is just a testing // component). if self.random_minimum >= self.random_maximum { // Could throw an evaluation error, but lets just panic panic!("going to crash 'n burn your system now, please provide valid arguments"); } if self.num_sends >= self.max_num_sends { self.exec_state.mode = CompMode::StartExit; } else { sched_ctx.log("Entering sync mode"); self.did_perform_send = false; self.consensus.notify_sync_start(comp_ctx); self.exec_state.mode = CompMode::Sync; } return Ok(CompScheduling::Immediate); }, CompMode::Sync => { // This component just sends a single message, then waits until // consensus has been reached if !self.did_perform_send { sched_ctx.log("Sending random message"); let mut random = self.generator.next_u32() - self.random_minimum; let random_delta = self.random_maximum - self.random_minimum; random %= random_delta; random += self.random_minimum; let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]); let port_handle = comp_ctx.get_port_handle(self.output_port_id); let port_info = comp_ctx.get_port(port_handle); let scheduling = if port_info.state.is_blocked() { // Need to wait until we can send the message self.exec_state.set_as_blocked_put(self.output_port_id, value_group); CompScheduling::Sleep } else { let message = self.consensus.annotate_data_message(comp_ctx, port_info, value_group); let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); let peer_info = comp_ctx.get_peer(peer_handle); peer_info.handle.send_message(sched_ctx, Message::Data(message), true); // Remain in sync mode, but after `did_perform_send` was // set to true. CompScheduling::Immediate }; // Blocked or not, we set `did_perform_send` to true. If // blocked then the moment we become unblocked (and are back // at the `Sync` mode) we have sent the message. self.did_perform_send = true; self.num_sends += 1; return Ok(scheduling) } else { // Message was sent, finish this sync round sched_ctx.log("Waiting for consensus"); self.exec_state.mode = CompMode::SyncEnd; let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); self.handle_sync_decision(sched_ctx, comp_ctx, decision); return Ok(CompScheduling::Requeue); } }, CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), CompMode::StartExit => return Ok(component::default_handle_start_exit( &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx )), CompMode::BusyExit => return Ok(component::default_handle_busy_exit( &mut self.exec_state, &self.control, sched_ctx )), CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), } } } impl ComponentRandomU32 { pub(crate) fn new(arguments: ValueGroup) -> Self { debug_assert_eq!(arguments.values.len(), 4); debug_assert!(arguments.regions.is_empty()); let port_id = component::port_id_from_eval(arguments.values[0].as_port_id()); let minimum = arguments.values[1].as_uint32(); let maximum = arguments.values[2].as_uint32(); let num_sends = arguments.values[3].as_uint32(); return Self{ output_port_id: port_id, random_minimum: minimum, random_maximum: maximum, num_sends: 0, max_num_sends: num_sends, generator: random::thread_rng(), exec_state: CompExecState::new(), did_perform_send: false, control: ControlLayer::default(), consensus: Consensus::new(), } } fn handle_sync_decision(&mut self, _sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { let success = match decision { SyncRoundDecision::None => return, SyncRoundDecision::Solution => true, SyncRoundDecision::Failure => false, }; debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd); if success { self.exec_state.mode = CompMode::NonSync; self.consensus.notify_sync_decision(decision); } else { self.exec_state.mode = CompMode::StartExit; } } }