Files @ 1bc57ab68e0e
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_random.rs - annotation

1bc57ab68e0e 6.4 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
Max Henger
Merge branch 'feat-builtin-ip' into 'master'

feat: Builtin internet component

See merge request nl-cwi-csy/reowolf!6
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
use rand::prelude as random;
use rand::RngCore;

use crate::protocol::eval::{ValueGroup, Value, EvalError};
use crate::runtime2::*;

use super::*;
use super::component::{self, Component, CompExecState, CompScheduling, CompMode};
use super::control_layer::*;
use super::consensus::*;

/// TODO: Temporary component to figure out what to do with custom components.
///     This component sends random numbers between two u32 limits
pub struct ComponentRandomU32 {
    // Properties for this specific component
    output_port_id: PortId,
    random_minimum: u32,
    random_maximum: u32,
    num_sends: u32,
    max_num_sends: u32,
    generator: random::ThreadRng,
    // Generic state-tracking
    exec_state: CompExecState,
    did_perform_send: bool, // when in sync mode
    control: ControlLayer,
    consensus: Consensus,
}

impl Component for ComponentRandomU32 {
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {}

    fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) {}

    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
        // Impossible since this component does not have any input ports in its
        // signature.
        unreachable!();
    }

    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
        match message {
            Message::Data(_message) => unreachable!(),
            Message::Sync(message) => {
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
                component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
            },
            Message::Control(message) => {
                component::default_handle_control_message(
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
                    message, sched_ctx, comp_ctx
                );
            },
            Message::Poll => unreachable!(),
        }
    }

    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));

        match self.exec_state.mode {
            CompMode::BlockedGet | CompMode::BlockedSelect => {
                // impossible for this component, no input ports and no select
                // blocks
                unreachable!();
            }
            CompMode::NonSync => {
                // If in non-sync mode then we check if the arguments make sense
                // (at some point in the future, this is just a testing
                // component).
                if self.random_minimum >= self.random_maximum {
                    // Could throw an evaluation error, but lets just panic
                    panic!("going to crash 'n burn your system now, please provide valid arguments");
                }

                if self.num_sends >= self.max_num_sends {
                    self.exec_state.mode = CompMode::StartExit;
                } else {
                    sched_ctx.log("Entering sync mode");
                    self.did_perform_send = false;
                    self.consensus.notify_sync_start(comp_ctx);
                    self.exec_state.mode = CompMode::Sync;
                }

                return Ok(CompScheduling::Immediate);
            },
            CompMode::Sync => {
                // This component just sends a single message, then waits until
                // consensus has been reached
                if !self.did_perform_send {
                    sched_ctx.log("Sending random message");
                    let mut random = self.generator.next_u32() - self.random_minimum;
                    let random_delta = self.random_maximum - self.random_minimum;
                    random %= random_delta;
                    random += self.random_minimum;
                    let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]);

                    let scheduling = component::default_send_data_message(
                        &mut self.exec_state, self.output_port_id, value_group,
                        sched_ctx, &mut self.consensus, comp_ctx
                    );

                    // Blocked or not, we set `did_perform_send` to true. If
                    // blocked then the moment we become unblocked (and are back
                    // at the `Sync` mode) we have sent the message.
                    self.did_perform_send = true;
                    self.num_sends += 1;
                    return Ok(scheduling)
                } else {
                    // Message was sent, finish this sync round
                    sched_ctx.log("Waiting for consensus");
                    self.exec_state.mode = CompMode::SyncEnd;
                    let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
                    component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
                    return Ok(CompScheduling::Requeue);
                }
            },
            CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep),
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
            )),
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
                &mut self.exec_state, &self.control, sched_ctx
            )),
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
        }
    }
}

impl ComponentRandomU32 {
    pub(crate) fn new(arguments: ValueGroup) -> Self {
        debug_assert_eq!(arguments.values.len(), 4);
        debug_assert!(arguments.regions.is_empty());
        let port_id = component::port_id_from_eval(arguments.values[0].as_port_id());
        let minimum = arguments.values[1].as_uint32();
        let maximum = arguments.values[2].as_uint32();
        let num_sends = arguments.values[3].as_uint32();

        return Self{
            output_port_id: port_id,
            random_minimum: minimum,
            random_maximum: maximum,
            num_sends: 0,
            max_num_sends: num_sends,
            generator: random::thread_rng(),
            exec_state: CompExecState::new(),
            did_perform_send: false,
            control: ControlLayer::default(),
            consensus: Consensus::new(),
        }
    }
}