Files @ ac804a4a3d70
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_random.rs

ac804a4a3d70 6.9 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
mh
More granularity in debug logging
use rand::prelude as random;
use rand::RngCore;

use crate::protocol::eval::{ValueGroup, Value};
use crate::runtime2::*;

use super::*;
use super::component::{
    self,
    Component, CompExecState, CompScheduling,
    CompMode, ExitReason
};
use super::control_layer::*;
use super::consensus::*;

/// TODO: Temporary component to figure out what to do with custom components.
///     This component sends random numbers between two u32 limits
pub struct ComponentRandomU32 {
    // Properties for this specific component
    output_port_id: PortId,
    random_minimum: u32,
    random_maximum: u32,
    num_sends: u32,
    max_num_sends: u32,
    generator: random::ThreadRng,
    // Generic state-tracking
    exec_state: CompExecState,
    did_perform_send: bool, // when in sync mode
    control: ControlLayer,
    consensus: Consensus,
}

impl Component for ComponentRandomU32 {
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {}

    fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) {}

    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
        // Impossible since this component does not have any input ports in its
        // signature.
        unreachable!();
    }

    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
        match message {
            Message::Data(_message) => unreachable!(),
            Message::Sync(message) => {
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
            },
            Message::Control(message) => {
                if let Err(location_and_message) = component::default_handle_control_message(
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
                    message, sched_ctx, comp_ctx
                ) {
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
                }
            },
            Message::Poll => unreachable!(),
        }
    }

    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
        sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));

        match self.exec_state.mode {
            CompMode::BlockedGet | CompMode::BlockedSelect => {
                // impossible for this component, no input ports and no select
                // blocks
                unreachable!();
            }
            CompMode::NonSync => {
                // If in non-sync mode then we check if the arguments make sense
                // (at some point in the future, this is just a testing
                // component).
                if self.random_minimum >= self.random_maximum {
                    // Could throw an evaluation error, but lets just panic
                    panic!("going to crash 'n burn your system now, please provide valid arguments");
                }

                if self.num_sends >= self.max_num_sends {
                    self.exec_state.set_as_start_exit(ExitReason::Termination);
                } else {
                    sched_ctx.info("Entering sync mode");
                    self.did_perform_send = false;
                    component::default_handle_sync_start(
                        &mut self.exec_state, &mut [], sched_ctx, comp_ctx, &mut self.consensus
                    );
                }

                return CompScheduling::Immediate;
            },
            CompMode::Sync => {
                // This component just sends a single message, then waits until
                // consensus has been reached
                if !self.did_perform_send {
                    sched_ctx.info("Sending random message");
                    let mut random = self.generator.next_u32() - self.random_minimum;
                    let random_delta = self.random_maximum - self.random_minimum;
                    random %= random_delta;
                    random += self.random_minimum;
                    let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]);

                    let send_result = component::default_send_data_message(
                        &mut self.exec_state, self.output_port_id,
                        PortInstruction::NoSource, value_group,
                        sched_ctx, &mut self.consensus, comp_ctx
                    );

                    if let Err(location_and_message) = send_result {
                        component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
                        return CompScheduling::Immediate
                    } else {
                        // Blocked or not, we set `did_perform_send` to true. If
                        // blocked then the moment we become unblocked (and are back
                        // at the `Sync` mode) we have sent the message.
                        let scheduling = send_result.unwrap();
                        self.did_perform_send = true;
                        self.num_sends += 1;
                        return scheduling
                    }
                } else {
                    // Message was sent, finish this sync round
                    component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
                    return CompScheduling::Requeue;
                }
            },
            CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep,
            CompMode::StartExit => return component::default_handle_start_exit(
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
            ),
            CompMode::BusyExit => return component::default_handle_busy_exit(
                &mut self.exec_state, &self.control, sched_ctx
            ),
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
        }
    }
}

impl ComponentRandomU32 {
    pub(crate) fn new(arguments: ValueGroup) -> Self {
        debug_assert_eq!(arguments.values.len(), 4);
        debug_assert!(arguments.regions.is_empty());
        let port_id = component::port_id_from_eval(arguments.values[0].as_port_id());
        let minimum = arguments.values[1].as_uint32();
        let maximum = arguments.values[2].as_uint32();
        let num_sends = arguments.values[3].as_uint32();

        return Self{
            output_port_id: port_id,
            random_minimum: minimum,
            random_maximum: maximum,
            num_sends: 0,
            max_num_sends: num_sends,
            generator: random::thread_rng(),
            exec_state: CompExecState::new(),
            did_perform_send: false,
            control: ControlLayer::default(),
            consensus: Consensus::new(),
        }
    }
}