Files
@ db1cd53e1755
Branch filter:
Location: CSY/reowolf/src/runtime2/component/component_random.rs
db1cd53e1755
7.2 KiB
application/rls-services+xml
Fix bug by delaying component creation when transferred ports are blocked
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | use rand::prelude as random;
use rand::RngCore;
use crate::protocol::eval::{ValueGroup, Value};
use crate::runtime2::communication::*;
use super::*;
use super::component::*;
use super::control_layer::*;
use super::consensus::*;
/// TODO: Temporary component to figure out what to do with custom components.
/// This component sends random numbers between two u32 limits
pub struct ComponentRandomU32 {
// Properties for this specific component
output_port_id: PortId,
random_minimum: u32,
random_maximum: u32,
num_sends: u32,
max_num_sends: u32,
generator: random::ThreadRng,
// Generic state-tracking
exec_state: CompExecState,
did_perform_send: bool, // when in sync mode
control: ControlLayer,
consensus: Consensus,
inbox_main: InboxMain, // not used
inbox_backup: InboxBackup, // not used
}
impl Component for ComponentRandomU32 {
fn on_creation(&mut self, _id: CompId, _sched_ctx: &SchedulerCtx) {}
fn on_shutdown(&mut self, sched_ctx: &SchedulerCtx) {}
fn adopt_message(&mut self, _comp_ctx: &mut CompCtx, _message: DataMessage) {
// Impossible since this component does not have any input ports in its
// signature.
unreachable!();
}
fn handle_message(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx, message: Message) {
match message {
Message::Data(_message) => unreachable!(),
Message::Sync(message) => {
let decision = self.consensus.receive_sync_message(sched_ctx, comp_ctx, message);
component::default_handle_sync_decision(sched_ctx, &mut self.exec_state, comp_ctx, decision, &mut self.consensus);
},
Message::Control(message) => {
if let Err(location_and_message) = component::default_handle_control_message(
&mut self.exec_state, &mut self.control, &mut self.consensus,
message, sched_ctx, comp_ctx, &mut self.inbox_main, &mut self.inbox_backup
) {
component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
}
},
Message::Poll => unreachable!(),
}
}
fn run(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut CompCtx) -> CompScheduling {
sched_ctx.info(&format!("Running component ComponentRandomU32 (mode: {:?})", self.exec_state.mode));
match self.exec_state.mode {
CompMode::BlockedGet | CompMode::BlockedSelect |
CompMode::BlockedPutPortsAwaitingAcks | CompMode::BlockedPutPortsReady |
CompMode::NewComponentBlocked => {
// impossible for this component, no input ports and no select
// blocks
unreachable!();
}
CompMode::NonSync => {
// If in non-sync mode then we check if the arguments make sense
// (at some point in the future, this is just a testing
// component).
if self.random_minimum >= self.random_maximum {
// Could throw an evaluation error, but lets just panic
panic!("going to crash 'n burn your system now, please provide valid arguments");
}
if self.num_sends >= self.max_num_sends {
self.exec_state.set_as_start_exit(ExitReason::Termination);
} else {
sched_ctx.info("Entering sync mode");
self.did_perform_send = false;
component::default_handle_sync_start(
&mut self.exec_state, &mut [], sched_ctx, comp_ctx, &mut self.consensus
);
}
return CompScheduling::Immediate;
},
CompMode::Sync => {
// This component just sends a single message, then waits until
// consensus has been reached
if !self.did_perform_send {
sched_ctx.info("Sending random message");
let mut random = self.generator.next_u32() - self.random_minimum;
let random_delta = self.random_maximum - self.random_minimum;
random %= random_delta;
random += self.random_minimum;
let value_group = ValueGroup::new_stack(vec![Value::UInt32(random)]);
let send_result = component::default_send_data_message(
&mut self.exec_state, self.output_port_id,
PortInstruction::NoSource, value_group,
sched_ctx, &mut self.consensus, &mut self.control,
comp_ctx
);
if let Err(location_and_message) = send_result {
component::default_handle_error_for_builtin(&mut self.exec_state, sched_ctx, location_and_message);
return CompScheduling::Immediate
} else {
// Blocked or not, we set `did_perform_send` to true. If
// blocked then the moment we become unblocked (and are back
// at the `Sync` mode) we have sent the message.
let scheduling = send_result.unwrap();
self.did_perform_send = true;
self.num_sends += 1;
return scheduling
}
} else {
// Message was sent, finish this sync round
component::default_handle_sync_end(&mut self.exec_state, sched_ctx, comp_ctx, &mut self.consensus);
return CompScheduling::Requeue;
}
},
CompMode::SyncEnd | CompMode::BlockedPut => return CompScheduling::Sleep,
CompMode::StartExit => return component::default_handle_start_exit(
&mut self.exec_state, &mut self.control, sched_ctx, comp_ctx, &mut self.consensus
),
CompMode::BusyExit => return component::default_handle_busy_exit(
&mut self.exec_state, &self.control, sched_ctx
),
CompMode::Exit => return component::default_handle_exit(&self.exec_state),
}
}
}
impl ComponentRandomU32 {
pub(crate) fn new(arguments: ValueGroup) -> Self {
debug_assert_eq!(arguments.values.len(), 4);
debug_assert!(arguments.regions.is_empty());
let port_id = component::port_id_from_eval(arguments.values[0].as_port_id());
let minimum = arguments.values[1].as_uint32();
let maximum = arguments.values[2].as_uint32();
let num_sends = arguments.values[3].as_uint32();
return Self{
output_port_id: port_id,
random_minimum: minimum,
random_maximum: maximum,
num_sends: 0,
max_num_sends: num_sends,
generator: random::thread_rng(),
exec_state: CompExecState::new(),
did_perform_send: false,
control: ControlLayer::default(),
consensus: Consensus::new(),
inbox_main: Vec::new(),
inbox_backup: Vec::new(),
}
}
}
|