Files @ 4a6883c04294
Branch filter:

Location: CSY/reowolf/src/runtime2/component/component_random.rs - annotation

4a6883c04294 6.9 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
Fix bug related to checking for closed port
113e4349a706
113e4349a706
113e4349a706
971e6293edfb
113e4349a706
113e4349a706
113e4349a706
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
8da52bdbcaa7
113e4349a706
113e4349a706
971e6293edfb
113e4349a706
113e4349a706
971e6293edfb
971e6293edfb
971e6293edfb
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
971e6293edfb
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
971e6293edfb
113e4349a706
113e4349a706
113e4349a706
d7f29db22526
d7f29db22526
d7f29db22526
113e4349a706
113e4349a706
971e6293edfb
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
971e6293edfb
971e6293edfb
971e6293edfb
113e4349a706
113e4349a706
113e4349a706
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
113e4349a706
113e4349a706
d7f29db22526
971e6293edfb
113e4349a706
113e4349a706
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
971e6293edfb
113e4349a706
971e6293edfb
971e6293edfb
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
113e4349a706
use rand::prelude as random;
use rand::RngCore;

use crate::protocol::eval::{ValueGroup, Value};
use crate::runtime2::*;

use super::*;
use super::component::{
    self,
    Component, CompExecState, CompScheduling,
    CompMode, ExitReason
};
use super::control_layer::*;
use super::consensus::*;

/// TODO: Temporary component to figure out what to do with custom components.
///     This component sends random numbers between two u32 limits
pub struct ComponentRandomU32 {
    // Properties for this specific component
    output_port_id: PortId,
    random_minimum: u32,
    random_maximum: u32,
    num_sends: u32,
    max_num_sends: u32,
    generator: random::ThreadRng,
    // Generic state-tracking
    exec_state: CompExecState,
    did_perform_send: bool, // when in sync mode
    control: ControlLayer,
    consensus: Consensus,
}

impl Component for ComponentRandomU32 {
    fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {}

    fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) {}

    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
        // Impossible since this component does not have any input ports in its
        // signature.
        unreachable!();
    }

    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
        match message {
            Message::Data(_message) => unreachable!(),
            Message::Sync(message) => {
                let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
                component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
            },
            Message::Control(message) => {
                if let Err(location_and_message) = component::default_handle_control_message(
                    &mut self.exec_state, &mut self.control, &mut self.consensus,
                    message, sched_ctx, comp_ctx
                ) {
                    component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
                }
            },
            Message::Poll => unreachable!(),
        }
    }

    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));

        match self.exec_state.mode {
            CompMode::BlockedGet | CompMode::BlockedSelect => {
                // impossible for this component, no input ports and no select
                // blocks
                unreachable!();
            }
            CompMode::NonSync => {
                // If in non-sync mode then we check if the arguments make sense
                // (at some point in the future, this is just a testing
                // component).
                if self.random_minimum >= self.random_maximum {
                    // Could throw an evaluation error, but lets just panic
                    panic!("going to crash 'n burn your system now, please provide valid arguments");
                }

                if self.num_sends >= self.max_num_sends {
                    self.exec_state.set_as_start_exit(ExitReason::Termination);
                } else {
                    sched_ctx.log("Entering sync mode");
                    self.did_perform_send = false;
                    component::default_handle_sync_start(
                        &mut self.exec_state, &mut [], sched_ctx, comp_ctx, &mut self.consensus
                    );
                }

                return CompScheduling::Immediate;
            },
            CompMode::Sync => {
                // This component just sends a single message, then waits until
                // consensus has been reached
                if !self.did_perform_send {
                    sched_ctx.log("Sending random message");
                    let mut random = self.generator.next_u32() - self.random_minimum;
                    let random_delta = self.random_maximum - self.random_minimum;
                    random %= random_delta;
                    random += self.random_minimum;
                    let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]);

                    let send_result = component::default_send_data_message(
                        &mut self.exec_state, self.output_port_id,
                        PortInstruction::NoSource, value_group,
                        sched_ctx, &mut self.consensus, comp_ctx
                    );

                    if let Err(location_and_message) = send_result {
                        component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
                        return CompScheduling::Immediate
                    } else {
                        // Blocked or not, we set `did_perform_send` to true. If
                        // blocked then the moment we become unblocked (and are back
                        // at the `Sync` mode) we have sent the message.
                        let scheduling = send_result.unwrap();
                        self.did_perform_send = true;
                        self.num_sends += 1;
                        return scheduling
                    }
                } else {
                    // Message was sent, finish this sync round
                    component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
                    return CompScheduling::Requeue;
                }
            },
            CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep,
            CompMode::StartExit => return component::default_handle_start_exit(
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
            ),
            CompMode::BusyExit => return component::default_handle_busy_exit(
                &mut self.exec_state, &self.control, sched_ctx
            ),
            CompMode::Exit => return component::default_handle_exit(&self.exec_state),
        }
    }
}

impl ComponentRandomU32 {
    pub(crate) fn new(arguments: ValueGroup) -> Self {
        debug_assert_eq!(arguments.values.len(), 4);
        debug_assert!(arguments.regions.is_empty());
        let port_id = component::port_id_from_eval(arguments.values[0].as_port_id());
        let minimum = arguments.values[1].as_uint32();
        let maximum = arguments.values[2].as_uint32();
        let num_sends = arguments.values[3].as_uint32();

        return Self{
            output_port_id: port_id,
            random_minimum: minimum,
            random_maximum: maximum,
            num_sends: 0,
            max_num_sends: num_sends,
            generator: random::thread_rng(),
            exec_state: CompExecState::new(),
            did_perform_send: false,
            control: ControlLayer::default(),
            consensus: Consensus::new(),
        }
    }
}