Files
@ 3b6c40dc10e1
Branch filter:
Location: CSY/reowolf/src/runtime2/component/component_random.rs - annotation
3b6c40dc10e1
6.3 KiB
application/rls-services+xml
Initial TCP component implementation
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d a5594f90afa6 c03e28261b5d a5594f90afa6 135965141596 135965141596 135965141596 a5594f90afa6 a5594f90afa6 a5594f90afa6 be8ea413a49a 135965141596 be8ea413a49a be8ea413a49a be8ea413a49a 42c130e76c4b 42c130e76c4b c03e28261b5d 135965141596 135965141596 c03e28261b5d 135965141596 135965141596 a5594f90afa6 a5594f90afa6 be8ea413a49a 3b6c40dc10e1 3b6c40dc10e1 3b6c40dc10e1 a5594f90afa6 aefbf606d736 aefbf606d736 aefbf606d736 a5594f90afa6 a5594f90afa6 a5594f90afa6 aefbf606d736 135965141596 aefbf606d736 135965141596 c62d6f0cc48a aefbf606d736 aefbf606d736 135965141596 135965141596 135965141596 135965141596 a3a2b16408b1 a3a2b16408b1 aefbf606d736 a5594f90afa6 a5594f90afa6 a5594f90afa6 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d 42c130e76c4b 42c130e76c4b 42c130e76c4b 42c130e76c4b 42c130e76c4b 42c130e76c4b 42c130e76c4b 42c130e76c4b 42c130e76c4b c03e28261b5d 135965141596 135965141596 c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d 3b6c40dc10e1 3b6c40dc10e1 3b6c40dc10e1 3b6c40dc10e1 c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d 42c130e76c4b c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c03e28261b5d c62d6f0cc48a c03e28261b5d c03e28261b5d 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 135965141596 a5594f90afa6 be8ea413a49a be8ea413a49a be8ea413a49a be8ea413a49a 42c130e76c4b be8ea413a49a 135965141596 be8ea413a49a be8ea413a49a 42c130e76c4b be8ea413a49a aefbf606d736 be8ea413a49a be8ea413a49a be8ea413a49a 42c130e76c4b 42c130e76c4b c03e28261b5d 135965141596 c03e28261b5d 135965141596 135965141596 135965141596 135965141596 a5594f90afa6 | use rand::prelude as random;
use rand::RngCore;
use crate::protocol::eval::{ValueGroup, Value, EvalError};
use crate::runtime2::*;
use super::*;
use super::component::{self, Component, CompExecState, CompScheduling, CompMode};
use super::control_layer::*;
use super::consensus::*;
/// TODO: Temporary component to figure out what to do with custom components.
/// This component sends random numbers between two u32 limits
pub struct ComponentRandomU32 {
// Properties for this specific component
output_port_id: PortId,
random_minimum: u32,
random_maximum: u32,
num_sends: u32,
max_num_sends: u32,
generator: random::ThreadRng,
// Generic state-tracking
exec_state: CompExecState,
did_perform_send: bool, // when in sync mode
control: ControlLayer,
consensus: Consensus,
}
impl Component for ComponentRandomU32 {
fn on_creation(&mut self, _sched_ctx: &SchedulerCtx) {
}
fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
// Impossible since this component does not have any input ports in its
// signature.
unreachable!();
}
fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
match message {
Message::Data(_message) => unreachable!(),
Message::Sync(message) => {
let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
},
Message::Control(message) => {
component::default_handle_control_message(
&mut self.exec_state, &mut self.control, &mut self.consensus,
message, sched_ctx, comp_ctx
);
},
Message::Poll => unreachable!(),
}
}
fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> Result<CompScheduling, EvalError> {
sched_ctx.log(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
match self.exec_state.mode {
CompMode::BlockedGet | CompMode::BlockedSelect => {
// impossible for this component, no input ports and no select
// blocks
unreachable!();
}
CompMode::NonSync => {
// If in non-sync mode then we check if the arguments make sense
// (at some point in the future, this is just a testing
// component).
if self.random_minimum >= self.random_maximum {
// Could throw an evaluation error, but lets just panic
panic!("going to crash 'n burn your system now, please provide valid arguments");
}
if self.num_sends >= self.max_num_sends {
self.exec_state.mode = CompMode::StartExit;
} else {
sched_ctx.log("Entering sync mode");
self.did_perform_send = false;
self.consensus.notify_sync_start(comp_ctx);
self.exec_state.mode = CompMode::Sync;
}
return Ok(CompScheduling::Immediate);
},
CompMode::Sync => {
// This component just sends a single message, then waits until
// consensus has been reached
if !self.did_perform_send {
sched_ctx.log("Sending random message");
let mut random = self.generator.next_u32() - self.random_minimum;
let random_delta = self.random_maximum - self.random_minimum;
random %= random_delta;
random += self.random_minimum;
let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]);
let scheduling = component::default_send_data_message(
&mut self.exec_state, self.output_port_id, value_group,
sched_ctx, &mut self.consensus, comp_ctx
);
// Blocked or not, we set `did_perform_send` to true. If
// blocked then the moment we become unblocked (and are back
// at the `Sync` mode) we have sent the message.
self.did_perform_send = true;
self.num_sends += 1;
return Ok(scheduling)
} else {
// Message was sent, finish this sync round
sched_ctx.log("Waiting for consensus");
self.exec_state.mode = CompMode::SyncEnd;
let decision = self.consensus.notify_sync_end(sched_ctx, comp_ctx);
component::default_handle_sync_decision(&mut self.exec_state, decision, &mut self.consensus);
return Ok(CompScheduling::Requeue);
}
},
CompMode::SyncEnd | CompMode::BlockedPut => return Ok(CompScheduling::Sleep),
CompMode::StartExit => return Ok(component::default_handle_start_exit(
&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx
)),
CompMode::BusyExit => return Ok(component::default_handle_busy_exit(
&mut self.exec_state, &self.control, sched_ctx
)),
CompMode::Exit => return Ok(component::default_handle_exit(&self.exec_state)),
}
}
}
impl ComponentRandomU32 {
pub(crate) fn new(arguments: ValueGroup) -> Self {
debug_assert_eq!(arguments.values.len(), 4);
debug_assert!(arguments.regions.is_empty());
let port_id = component::port_id_from_eval(arguments.values[0].as_port_id());
let minimum = arguments.values[1].as_uint32();
let maximum = arguments.values[2].as_uint32();
let num_sends = arguments.values[3].as_uint32();
return Self{
output_port_id: port_id,
random_minimum: minimum,
random_maximum: maximum,
num_sends: 0,
max_num_sends: num_sends,
generator: random::thread_rng(),
exec_state: CompExecState::new(),
did_perform_send: false,
control: ControlLayer::default(),
consensus: Consensus::new(),
}
}
}
|