diff --git a/src/runtime2/component/component_random.rs b/src/runtime2/component/component_random.rs new file mode 100644 index 0000000000000000000000000000000000000000..56ffe495972a8f55b68e4927b38b07a945fa01cc --- /dev/null +++ b/src/runtime2/component/component_random.rs @@ -0,0 +1,161 @@ +use rand::prelude as random; +use rand::RngCore; + +use crate::protocol::eval::{ValueGroup, Value, EvalError}; +use crate::runtime2::*; + +use super::*; +use super::component::{self, Component, CompExecState, CompScheduling, CompMode}; +use super::control_layer::*; +use super::consensus::*; + +/// TODO: Temporary component to figure out what to do with custom components. +/// This component sends random numbers between two u32 limits +pub struct ComponentRandomU32 { + // Properties for this specific component + output_port_id: PortId, + random_minimum: u32, + random_maximum: u32, + num_sends: u32, + max_num_sends: u32, + generator: random::ThreadRng, + // Generic state-tracking + exec_state: CompExecState, + did_perform_send: bool, // when in sync mode + control: ControlLayer, + consensus: Consensus, +} + +impl Component for ComponentRandomU32 { + fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) { + // Impossible since this component does not have any input ports in its + // signature. + unreachable!(); + } + + fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) { + match message { + Message::Data(_message) => unreachable!(), + Message::Sync(message) => { + let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + }, + Message::Control(message) => { + component::default_handle_control_message( + &mut self.exec_state, &mut self.control, &mut self.consensus, + message, sched_ctx, comp_ctx + ); + }, + Message::Poll => unreachable!(), + } + } + + fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result { + sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode)); + + match self.exec_state.mode { + CompMode::BlockedGet | CompMode::BlockedSelect => { + // impossible for this component, no input ports and no select + // blocks + unreachable!(); + } + CompMode::NonSync => { + // If in non-sync mode then we check if the arguments make sense + // (at some point in the future, this is just a testing + // component). + if self.random_minimum >= self.random_maximum { + // Could throw an evaluation error, but lets just panic + panic!("going to crash 'n burn your system now, please provide valid arguments"); + } + + if self.num_sends >= self.max_num_sends { + self.exec_state.mode = CompMode::StartExit; + } else { + sched_ctx.log("Entering sync mode"); + self.did_perform_send = false; + self.consensus.notify_sync_start(comp_ctx); + self.exec_state.mode = CompMode::Sync; + } + + return Ok(CompScheduling::Immediate); + }, + CompMode::Sync => { + // This component just sends a single message, then waits until + // consensus has been reached + if !self.did_perform_send { + sched_ctx.log("Sending random message"); + let mut random = self.generator.next_u32() - self.random_minimum; + let random_delta = self.random_maximum - self.random_minimum; + random %= random_delta; + random += self.random_minimum; + let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]); + + let port_handle = comp_ctx.get_port_handle(self.output_port_id); + let port_info = comp_ctx.get_port(port_handle); + + let scheduling = if port_info.state.is_blocked() { + // Need to wait until we can send the message + self.exec_state.set_as_blocked_put(self.output_port_id, value_group); + + CompScheduling::Sleep + } else { + let message = self.consensus.annotate_data_message(comp_ctx, port_info, value_group); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(&sched_ctx.runtime, Message::Data(message), true); + + // Remain in sync mode, but after `did_perform_send` was + // set to true. + CompScheduling::Immediate + }; + + // Blocked or not, we set `did_perform_send` to true. If + // blocked then the moment we become unblocked (and are back + // at the `Sync` mode) we have sent the message. + self.did_perform_send = true; + self.num_sends += 1; + return Ok(scheduling) + } else { + // Message was sent, finish this sync round + sched_ctx.log("Waiting for consensus"); + self.exec_state.mode = CompMode::SyncEnd; + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus); + return Ok(CompScheduling::Requeue); + } + }, + CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), + CompMode::StartExit => return Ok(component::default_handle_start_exit( + &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx + )), + CompMode::BusyExit => return Ok(component::default_handle_busy_exit( + &mut self.exec_state, &self.control, sched_ctx + )), + CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)), + } + } +} + +impl ComponentRandomU32 { + pub(crate) fn new(arguments: ValueGroup) -> Self { + debug_assert_eq!(arguments.values.len(), 4); + debug_assert!(arguments.regions.is_empty()); + let port_id = component::port_id_from_eval(arguments.values[0].as_port_id()); + let minimum = arguments.values[1].as_uint32(); + let maximum = arguments.values[2].as_uint32(); + let num_sends = arguments.values[3].as_uint32(); + + return Self{ + output_port_id: port_id, + random_minimum: minimum, + random_maximum: maximum, + num_sends: 0, + max_num_sends: num_sends, + generator: random::thread_rng(), + exec_state: CompExecState::new(), + did_perform_send: false, + control: ControlLayer::default(), + consensus: Consensus::new(), + } + } +} \ No newline at end of file