Changeset - c03e28261b5d
[Not reviewed]
0 4 0
mh - 3 years ago 2022-03-30 15:03:03
contact@maxhenger.nl
Initial implementation of random number component
4 files changed with 65 insertions and 12 deletions:
0 comments (0 inline, 0 general)
src/runtime2/communication.rs
Show inline comments
 
@@ -8,29 +8,24 @@ use super::component::*;
 

	
 
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 
pub struct PortId(pub u32);
 

	
 
impl PortId {
 
    /// This value is not significant, it is chosen to make debugging easier: a
 
    /// very large port number is more likely to shine a light on bugs.
 
    pub fn new_invalid() -> Self {
 
        return Self(u32::MAX);
 
    }
 
}
 

	
 
pub struct CompPortIds {
 
    pub comp: CompId,
 
    pub port: PortId,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
 
pub enum PortKind {
 
    Putter,
 
    Getter,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub enum PortState {
 
    Open,
 
    BlockedDueToPeerChange,
 
    BlockedDueToFullBuffers,
 
    Closed,
src/runtime2/component/component.rs
Show inline comments
 
@@ -259,25 +259,25 @@ pub(crate) fn default_handle_busy_exit(
 
    if control.has_acks_remaining() {
 
        sched_ctx.log("Component busy exiting, still has `Ack`s remaining");
 
        return CompScheduling::Sleep;
 
    } else {
 
        sched_ctx.log("Component busy exiting, now shutting down");
 
        exec_state.mode = CompMode::Exit;
 
        return CompScheduling::Exit;
 
    }
 
}
 

	
 
#[inline]
 
pub(crate) fn default_handle_exit(_exec_state: &CompExecState) -> CompScheduling {
 
    debug_assert_eq!(exec_state.mode, CompMode::Exit);
 
    debug_assert_eq!(_exec_state.mode, CompMode::Exit);
 
    return CompScheduling::Exit;
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Internal messaging/state utilities
 
// -----------------------------------------------------------------------------
 

	
 
/// Handles an `Ack` for the control layer.
 
fn default_handle_ack(
 
    control: &mut ControlLayer, control_id: ControlId,
 
    sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx
 
) {
 
@@ -333,24 +333,25 @@ fn default_send_ack(
 
/// on that port then it will be sent.
 
fn default_handle_unblock_put(
 
    exec_state: &mut CompExecState, consensus: &mut Consensus,
 
    port_handle: LocalPortHandle, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx,
 
) {
 
    let port_info = comp_ctx.get_port_mut(port_handle);
 
    let port_id = port_info.self_id;
 
    debug_assert!(port_info.state.is_blocked());
 
    port_info.state = PortState::Open;
 

	
 
    if exec_state.is_blocked_on_put(port_id) {
 
        // Annotate the message that we're going to send
 
        let port_info = comp_ctx.get_port(port_handle); // for immutable access
 
        debug_assert_eq!(port_info.kind, PortKind::Putter);
 
        let to_send = exec_state.mode_value.take();
 
        let to_send = consensus.annotate_data_message(comp_ctx, port_info, to_send);
 

	
 
        // Retrieve peer to send the message
 
        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
        let peer_info = comp_ctx.get_peer(peer_handle);
 
        peer_info.handle.send_message(sched_ctx, Message::Data(to_send), true);
 

	
 
        exec_state.mode = CompMode::Sync; // because we're blocked on a `put`, we must've started in the sync state.
 
        exec_state.mode_port = PortId::new_invalid();
 
    }
src/runtime2/component/component_ip.rs
Show inline comments
 
use crate::protocol::eval::{ValueGroup, EvalError};
 
use rand::prelude as random;
 
use rand::RngCore;
 

	
 
use crate::protocol::eval::{ValueGroup, Value, EvalError};
 
use crate::runtime2::*;
 

	
 
use super::*;
 
use super::component::{self, Component, CompExecState, CompScheduling, CompMode};
 
use super::control_layer::*;
 
use super::consensus::*;
 

	
 
/// TODO: Temporary component to figure out what to do with custom components.
 
///     This component sends random numbers between two u32 limits
 
pub struct ComponentRandomU32 {
 
    // Properties for this specific component
 
    output_port_id: PortId,
 
    random_minimum: u32,
 
    random_maximum: u32,
 
    generator: random::ThreadRng,
 
    // Generic state-tracking
 
    exec_state: CompExecState,
 
    did_perform_send: bool, // when in sync mode
 
    control: ControlLayer,
 
    consensus: Consensus,
 
}
 

	
 
impl Component for ComponentRandomU32 {
 
    fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
 
        // Impossible since this component does not have any input ports in its
 
        // signature.
 
        unreachable!();
 
    }
 

	
 
    fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
 
@@ -44,28 +50,79 @@ impl Component for ComponentRandomU32 {
 
    fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
 
        sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
 

	
 
        match self.exec_state.mode {
 
            CompMode::BlockedGet | CompMode::BlockedSelect => {
 
                // impossible for this component, no input ports and no select
 
                // blocks
 
                unreachable!();
 
            }
 
            CompMode::NonSync => {
 
                // If in non-sync mode then we check if the arguments make sense
 
                // (at some point in the future, this is just a testing
 
                // component)
 
                // component).
 
                if self.random_minimum >= self.random_maximum {
 
                    // Could throw an evaluation error, but lets just panic
 
                    panic!("going to crash 'n burn your system now, please provide valid arguments");
 
                }
 

	
 
                sched_ctx.log("Entering sync mode");
 
                self.did_perform_send = false;
 
                self.consensus.notify_sync_start(comp_ctx);
 
                self.exec_state.mode = CompMode::Sync;
 
                return Ok(CompScheduling::Immediate);
 
            },
 
            CompMode::Sync => {
 
                // This component just sends a single message, then waits until
 
                // consensus has been reached
 
                if !self.did_perform_send {
 
                    sched_ctx.log("Sending random message");
 
                    let mut random = self.generator.next_u32() - self.random_minimum;
 
                    let random_delta = self.random_maximum - self.random_minimum;
 
                    random %= random_delta;
 
                    random += self.random_minimum;
 
                    let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]);
 

	
 
                    let port_handle = comp_ctx.get_port_handle(self.output_port_id);
 
                    let port_info = comp_ctx.get_port(port_handle);
 

	
 
                    let scheduling = if port_info.state.is_blocked() {
 
                        // Need to wait until we can send the message
 
                        self.exec_state.set_as_blocked_put(self.output_port_id, value_group);
 

	
 
                        CompScheduling::Sleep
 
                    } else {
 
                        let message = self.consensus.annotate_data_message(comp_ctx, port_info, value_group);
 
                        let peer_handle = comp_ctx.get_peer_handle(port_info.peer_comp_id);
 
                        let peer_info = comp_ctx.get_peer(peer_handle);
 
                        peer_info.handle.send_message(sched_ctx, Message::Data(message), true);
 

	
 
                        // Remain in sync mode, but after `did_perform_send` was
 
                        // set to true.
 
                        CompScheduling::Immediate
 
                    };
 

	
 
                    // Blocked or not, we set `did_perform_send` to true. If
 
                    // blocked then the moment we become unblocked (and are back
 
                    // at the `Sync` mode) we have sent the message.
 
                    self.did_perform_send = true;
 
                    return Ok(scheduling)
 
                } else {
 
                    // Message was sent, finish this sync round
 
                    sched_ctx.log("Waiting for consensus");
 
                    self.exec_state.mode = CompMode::SyncEnd;
 
                    let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
                    self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
                    return Ok(CompScheduling::Requeue);
 
                }
 
            },
 
            CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep),
 
            CompMode::StartExit => return Ok(component::default_handle_start_exit(
 
                &mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
 
            )),
 
            CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
 
                &mut self.exec_state, &self.control, sched_ctx
 
            )),
 
            CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
 
        }
 
    }
 
}
 
@@ -73,32 +130,32 @@ impl Component for ComponentRandomU32 {
 
impl ComponentRandomU32 {
 
    pub(crate) fn new(arguments: ValueGroup) -> Self {
 
        debug_assert_eq!(arguments.values.len(), 3);
 
        debug_assert!(arguments.regions.is_empty());
 
        let port_id = component::port_id_from_eval(arguments.values[0].as_port_id());
 
        let minimum = arguments.values[1].as_uint32();
 
        let maximum = arguments.values[2].as_uint32();
 

	
 
        return Self{
 
            output_port_id: port_id,
 
            random_minimum: minimum,
 
            random_maximum: maximum,
 
            generator: random::thread_rng(),
 
            exec_state: CompExecState::new(),
 
            did_perform_send: false,
 
            control: ControlLayer::default(),
 
            consensus: Consensus::new(),
 
        }
 
    }
 

	
 

	
 

	
 
    fn handle_sync_decision(&mut self, _sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        let success = match decision {
 
            SyncRoundDecision::None => return,
 
            SyncRoundDecision::Solution => true,
 
            SyncRoundDecision::Failure => false,
 
        };
 

	
 
        debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
        if success {
 
            self.exec_state.mode = CompMode::NonSync;
 
            self.consensus.notify_sync_decision(decision);
 
        } else {
src/runtime2/component/component_pdl.rs
Show inline comments
 
@@ -198,25 +198,25 @@ impl SelectState {
 

	
 
        if self.candidates_workspace.is_empty() {
 
            return SelectDecision::None;
 
        } else {
 
            let candidate_index = self.random.get_u64() as usize % self.candidates_workspace.len();
 
            return SelectDecision::Case(self.candidates_workspace[candidate_index] as u32);
 
        }
 
    }
 
}
 

	
 
pub(crate) struct CompPDL {
 
    pub exec_state: CompExecState,
 
    pub select_state: SelectState,
 
    select_state: SelectState,
 
    pub prompt: Prompt,
 
    pub control: ControlLayer,
 
    pub consensus: Consensus,
 
    pub sync_counter: u32,
 
    pub exec_ctx: ExecCtx,
 
    // TODO: Temporary field, simulates future plans of having one storage place
 
    //  reserved per port.
 
    // Should be same length as the number of ports. Corresponding indices imply
 
    // message is intended for that port.
 
    pub inbox_main: InboxMain,
 
    pub inbox_backup: Vec<DataMessage>,
 
}
 
@@ -453,25 +453,25 @@ impl CompPDL {
 
    /// immediately
 
    fn handle_sync_end(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut CompCtx) {
 
        sched_ctx.log("Component ending sync mode (now waiting for solution)");
 
        let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
 
        self.exec_state.mode = CompMode::SyncEnd;
 
        self.handle_sync_decision(sched_ctx, comp_ctx, decision);
 
    }
 

	
 
    /// Handles decision from the consensus round. This will cause a change in
 
    /// the internal `Mode`, such that the next call to `run` can take the
 
    /// appropriate next steps.
 
    fn handle_sync_decision(&mut self, sched_ctx: &SchedulerCtx, _comp_ctx: &mut CompCtx, decision: SyncRoundDecision) {
 
        sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.mode));
 
        sched_ctx.log(&format!("Handling sync decision: {:?} (in mode {:?})", decision, self.exec_state.mode));
 
        match decision {
 
            SyncRoundDecision::None => {
 
                // No decision yet
 
                return;
 
            },
 
            SyncRoundDecision::Solution => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
 
                self.exec_state.mode = CompMode::NonSync;
 
                self.consensus.notify_sync_decision(decision);
 
            },
 
            SyncRoundDecision::Failure => {
 
                debug_assert_eq!(self.exec_state.mode, CompMode::SyncEnd);
0 comments (0 inline, 0 general)