diff --git a/src/runtime2/component/component_ip.rs b/src/runtime2/component/component_ip.rs index 87c65d3baa7173c5dabd768677bd9cea5cce17ba..1b9b906d48addf922f4f4edc8312d6aeaee8d9ac 100644 --- a/src/runtime2/component/component_ip.rs +++ b/src/runtime2/component/component_ip.rs @@ -1,5 +1,9 @@ -use crate::protocol::eval::{ValueGroup, EvalError}; +use rand::prelude as random; +use rand::RngCore; + +use crate::protocol::eval::{ValueGroup, Value, EvalError}; use crate::runtime2::*; + use super::*; use super::component::{self, Component, CompExecState, CompScheduling, CompMode}; use super::control_layer::*; @@ -12,8 +16,10 @@ pub struct ComponentRandomU32 { output_port_id: PortId, random_minimum: u32, random_maximum: u32, + generator: random::ThreadRng, // Generic state-tracking exec_state: CompExecState, + did_perform_send: bool, // when in sync mode control: ControlLayer, consensus: Consensus, } @@ -53,10 +59,61 @@ impl Component for ComponentRandomU32 { CompMode::NonSync => { // If in non-sync mode then we check if the arguments make sense // (at some point in the future, this is just a testing - // component) + // component). + if self.random_minimum >= self.random_maximum { + // Could throw an evaluation error, but lets just panic + panic!("going to crash 'n burn your system now, please provide valid arguments"); + } + + sched_ctx.log("Entering sync mode"); + self.did_perform_send = false; + self.consensus.notify_sync_start(comp_ctx); + self.exec_state.mode = CompMode::Sync; + return Ok(CompScheduling::Immediate); }, CompMode::Sync => { + // This component just sends a single message, then waits until + // consensus has been reached + if !self.did_perform_send { + sched_ctx.log("Sending random message"); + let mut random = self.generator.next_u32() - self.random_minimum; + let random_delta = self.random_maximum - self.random_minimum; + random %= random_delta; + random += self.random_minimum; + let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]); + + let port_handle = comp_ctx.get_port_handle(self.output_port_id); + let port_info = comp_ctx.get_port(port_handle); + + let scheduling = if port_info.state.is_blocked() { + // Need to wait until we can send the message + self.exec_state.set_as_blocked_put(self.output_port_id, value_group); + CompScheduling::Sleep + } else { + let message = self.consensus.annotate_data_message(comp_ctx, port_info, value_group); + let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id); + let peer_info = comp_ctx.get_peer(peer_handle); + peer_info.handle.send_message(sched_ctx, Message::Data(message), true); + + // Remain in sync mode, but after `did_perform_send` was + // set to true. + CompScheduling::Immediate + }; + + // Blocked or not, we set `did_perform_send` to true. If + // blocked then the moment we become unblocked (and are back + // at the `Sync` mode) we have sent the message. + self.did_perform_send = true; + return Ok(scheduling) + } else { + // Message was sent, finish this sync round + sched_ctx.log("Waiting for consensus"); + self.exec_state.mode = CompMode::SyncEnd; + let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx); + self.handle_sync_decision(sched_ctx, comp_ctx, decision); + return Ok(CompScheduling::Requeue); + } }, CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep), CompMode::StartExit => return Ok(component::default_handle_start_exit( @@ -82,14 +139,14 @@ impl ComponentRandomU32 { output_port_id: port_id, random_minimum: minimum, random_maximum: maximum, + generator: random::thread_rng(), exec_state: CompExecState::new(), + did_perform_send: false, control: ControlLayer::default(), consensus: Consensus::new(), } } - - fn handle_sync_decision(&mut self, _sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) { let success = match decision { SyncRoundDecision::None => return,