diff --git a/src/common.rs b/src/common.rs index 537487d427612edb0a78dec1f1462bfd040f1db5..08587130bf657da1b637756355705f7d25c40ad5 100644 --- a/src/common.rs +++ b/src/common.rs @@ -54,6 +54,17 @@ pub struct ComponentId(Id); // PUB because it can be returned by errors #[repr(transparent)] pub struct PortId(pub(crate) Id); +impl PortId { + // TODO: Remove concept of ComponentId and PortId in this file + #[deprecated] + pub fn new(port: u32) -> Self { + return PortId(Id{ + connector_id: u32::MAX, + u32_suffix: port, + }); + } +} + /// A safely aliasable heap-allocated payload of message bytes #[derive(Default, Eq, PartialEq, Clone, Ord, PartialOrd)] pub struct Payload(pub Arc>); diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index e951d0d9c29ff1c2f208c3dccd1df388d9d9b54a..419df819b60b039f6930dbfea3d6269311a82723 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -59,6 +59,7 @@ pub(crate) struct Branch { parent_index: BranchId, // Code execution state code_state: ComponentState, + prepared_channel: Option<(Value, Value)>, sync_state: SpeculativeState, next_branch_in_queue: Option, // Message/port state @@ -74,6 +75,7 @@ impl Branch { index: BranchId::new_invalid(), parent_index: BranchId::new_invalid(), code_state: component_state, + prepared_channel: None, sync_state: SpeculativeState::RunningNonSync, next_branch_in_queue: None, received: HashMap::new(), @@ -88,11 +90,13 @@ impl Branch { (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) ); + debug_assert!(parent_branch.prepared_channel.is_none()); Branch{ index: BranchId::new(new_index), parent_index: parent_branch.index, code_state: parent_branch.code_state.clone(), + prepared_channel: None, sync_state: SpeculativeState::RunningInSync, next_branch_in_queue: None, received: parent_branch.received.clone(), @@ -268,6 +272,7 @@ impl ConnectorPorts { let branch_idx = branch_idx as usize; let num_ports = self.owned_ports.len(); + println!("port_idx = {}, branch_idx = {}, num_ports = {}, port_mapping.len() = {}", port_idx, branch_idx, num_ports, self.port_mapping.len()); debug_assert!(port_idx < num_ports); debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len()); @@ -333,28 +338,31 @@ pub(crate) struct ConnectorPDL { pub ports: ConnectorPorts, } +// TODO: Remove this monstrosity struct ConnectorRunContext<'a> { - inbox: &'a PrivateInbox, + branch_index: u32, ports: &'a ConnectorPorts, - branch: &'a Branch, + ports_delta: &'a Vec, + received: &'a HashMap, scheduler: SchedulerCtx<'a>, + prepared_channel: Option<(Value, Value)>, } impl<'a> RunContext for ConnectorRunContext<'a> { fn did_put(&mut self, port: PortId) -> bool { - if self.branch.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) { + if self.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) { // Either acquired or released, must be silent return false; } let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap(); - let mapping = self.ports.get_port(self.branch.index.index, port_index); + let mapping = self.ports.get_port(self.branch_index, port_index); return mapping.is_assigned; } fn get(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); - match self.branch.received.get(&port_id) { + match self.received.get(&port_id) { Some(message) => Some(message.message.clone()), None => None, } @@ -362,12 +370,12 @@ impl<'a> RunContext for ConnectorRunContext<'a> { fn fires(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); - if self.branch.ports_delta.iter().any(|v| v.port_id == port_id) { + if self.ports_delta.iter().any(|v| v.port_id == port_id) { return None } let port_index = self.ports.get_port_index(port_id).unwrap(); - let mapping = self.ports.get_port(self.branch.index.index, port_index); + let mapping = self.ports.get_port(self.branch_index, port_index); if mapping.is_assigned { return Some(Value::Bool(mapping.num_times_fired != 0)); @@ -377,9 +385,7 @@ impl<'a> RunContext for ConnectorRunContext<'a> { } fn get_channel(&mut self) -> Option<(Value, Value)> { - let (getter, putter) = self.scheduler.runtime.create_channel(); - debug_assert_eq!(getter.kind, PortKind::Getter); - + return self.prepared_channel.take(); } } @@ -396,9 +402,9 @@ impl Connector for ConnectorPDL { } } - fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { - let scheduling = self.run_in_speculative_mode(pd, conn_ctx, delta_state); + let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -447,7 +453,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(pd, conn_ctx, delta_state); + let scheduling = self.run_in_deterministic_mode(sched_ctx, conn_ctx, delta_state); return scheduling; } } @@ -719,15 +725,26 @@ impl ConnectorPDL { /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); - debug_assert!(!self.sync_active.is_empty()); + + if self.sync_active.is_empty() { + return ConnectorScheduling::NotNow; + } let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active); // Run the branch to the next blocking point - let mut run_context = ConnectorRunContext {}; - let run_result = branch.code_state.run(&mut run_context, pd); + debug_assert!(branch.prepared_channel.is_none()); + let mut run_context = ConnectorRunContext { + branch_index: branch.index.index, + ports: &self.ports, + ports_delta: &branch.ports_delta, + scheduler: sched_ctx, + prepared_channel: None, + received: &branch.received, + }; + let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); // Match statement contains `return` statements only if the particular // run result behind handled requires an immediate re-run of the @@ -908,6 +925,9 @@ impl ConnectorPDL { results.ports.clear(); results.outbox.push(MessageContents::Data(message)); + + let branch_index = branch.index; + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index); return ConnectorScheduling::Immediate } else { branch.sync_state = SpeculativeState::Inconsistent; @@ -926,7 +946,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -935,11 +955,14 @@ impl ConnectorPDL { debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ - inbox: &self.inbox, + branch_index: branch.index.index, ports: &self.ports, - branch: &Branch {} + ports_delta: &branch.ports_delta, + scheduler: sched_ctx, + prepared_channel: branch.prepared_channel.take(), + received: &branch.received, }; - let run_result = branch.code_state.run(&mut run_context, pd); + let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); match run_result { RunResult::ComponentTerminated => { @@ -953,6 +976,7 @@ impl ConnectorPDL { self.in_sync = true; let first_sync_branch = Branch::new_sync_branching_from(1, branch); let first_sync_branch_id = first_sync_branch.index; + self.ports.prepare_sync_branch(0, 1); self.branches.push(first_sync_branch); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id); @@ -973,7 +997,11 @@ impl ConnectorPDL { // Add connector for later execution let new_connector_state = ComponentState { - prompt: Prompt::new(&pd.types, &pd.heap, definition_id, monomorph_idx, arguments) + prompt: Prompt::new( + &sched_ctx.runtime.protocol_description.types, + &sched_ctx.runtime.protocol_description.heap, + definition_id, monomorph_idx, arguments + ) }; let new_connector_ports = results.ports.clone(); // TODO: Do something with this let new_connector_branch = Branch::new_initial_branch(new_connector_state); @@ -985,9 +1013,17 @@ impl ConnectorPDL { }, RunResult::NewChannel => { // Need to prepare a new channel - todo!("adding channels to some global context"); + let (getter, putter) = sched_ctx.runtime.create_channel(conn_ctx.id); + debug_assert_eq!(getter.kind, PortKind::Getter); + branch.prepared_channel = Some(( + Value::Input(PortId::new(putter.self_id.index)), + Value::Output(PortId::new(getter.self_id.index)) + )); - return ConnectorScheduling::Later; + results.new_ports.push(putter); + results.new_ports.push(getter); + + return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), } @@ -1106,6 +1142,10 @@ impl ConnectorPDL { /// Releasing ownership of ports during a sync-session. Will provide an /// error if the port was already used during a sync block. fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + if port_ids.is_empty() { + return Ok(()) + } + todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode @@ -1157,6 +1197,10 @@ impl ConnectorPDL { /// Acquiring ports during a sync-session. fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + if port_ids.is_empty() { + return Ok(()) + } + todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index fd5a8c1f4a1ae39e5134f103a1bcb423e059a189..a98334a747a63370f502708fc830e019e335b43c 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -22,9 +22,10 @@ use crate::ProtocolDescription; use inbox::Message; use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; -use scheduler::{Scheduler, ConnectorCtx, Router}; +use scheduler::{Scheduler, ConnectorCtx, ControlMessageHandler}; use native::{Connector, ConnectorApplication, ApplicationInterface}; use crate::runtime2::port::Port; +use crate::runtime2::scheduler::SchedulerCtx; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this @@ -84,10 +85,10 @@ impl Connector for ConnectorVariant { } } - fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, scheduler_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { match self { - ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state), - ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state), + ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, conn_ctx, delta_state), + ConnectorVariant::Native(c) => c.run(scheduler_ctx, conn_ctx, delta_state), } } } @@ -96,7 +97,9 @@ pub(crate) struct ScheduledConnector { pub connector: ConnectorVariant, // access by connector pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector pub public: ConnectorPublic, // accessible by all schedulers and connectors - pub router: Router, + pub router: ControlMessageHandler, + pub shutting_down: bool, + pub pending_acks: u32, } // ----------------------------------------------------------------------------- @@ -214,11 +217,11 @@ impl RuntimeInner { self.scheduler_notifier.notify_one(); } - // --- Creating ports + // --- Creating/using ports /// Creates a new port pair. Note that these are stored globally like the /// connectors are. Ports stored by components belong to those components. - pub(crate) fn create_channel(&self) -> (Port, Port) { + pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) { use port::{PortIdLocal, PortKind}; let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); @@ -229,18 +232,34 @@ impl RuntimeInner { self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, - peer_connector: self.connector_id, + peer_connector: creating_connector, }; let putter_port = Port{ self_id: putter_id, peer_id: getter_id, kind: PortKind::Putter, - peer_connector: self.connector_id, + peer_connector: creating_connector, }; return (getter_port, putter_port); } + /// Sends a message to a particular connector. If the connector happened to + /// be sleeping then it will be scheduled for execution. + pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) { + let target = self.get_component_public(target_id); + target.inbox.insert_message(message); + + let should_wake_up = target.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(target_id) }; + self.push_work(key); + } + } + // --- Creating/retrieving/destroying components pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { @@ -259,7 +278,7 @@ impl RuntimeInner { // Create as not sleeping, as we'll schedule it immediately let key = { let mut lock = self.connectors.write().unwrap(); - lock.create(ConnectorVariant::UserDefined(connector), true) + lock.create(ConnectorVariant::UserDefined(connector), false) }; // Transfer the ports @@ -283,11 +302,13 @@ impl RuntimeInner { return key; } + #[inline] pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { let lock = self.connectors.read().unwrap(); return lock.get_private(connector_key); } + #[inline] pub(crate) fn get_component_public(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { let lock = self.connectors.read().unwrap(); return lock.get_public(connector_id); @@ -406,7 +427,9 @@ impl ConnectorStore { connector, context: ConnectorCtx::new(), public: ConnectorPublic::new(initially_sleeping), - router: Router::new(), + router: ControlMessageHandler::new(), + shutting_down: false, + pending_acks: 0, }; let index; diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 5eab60d0def9ecf32d6275a1a1c52568d19bf5cf..f8779bcb9fd6fa1dbea81a7fd38cac5bb8078811 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -21,7 +21,7 @@ pub(crate) trait Connector { fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); /// Should run the connector's behaviour up until the next blocking point. - fn run(&mut self, sched_ctx: &SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; + fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; @@ -66,7 +66,7 @@ impl Connector for ConnectorApplication { } } - fn run(&mut self, _sched_ctx: &SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, _sched_ctx: SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { @@ -112,7 +112,7 @@ impl ApplicationInterface { /// Creates a new channel. pub fn create_channel(&mut self) -> Channel { - let (getter_port, putter_port) = self.runtime.create_channel(); + let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id); debug_assert_eq!(getter_port.kind, PortKind::Getter); let getter_id = getter_port.self_id; let putter_id = putter_port.self_id; diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index ec1472dea11c434f063d786ca556f96293e9307d..a4503b7ca237a1a045c634fc4210715ef8d9cf61 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -21,7 +21,7 @@ impl PortIdLocal { } } -#[derive(Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq)] pub enum PortKind { Putter, Getter, diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e9680d4ed5cb218f9167365de80b67554db3b3fc..1c285fb9c3124e9b303d560e87ca230c80b5ae70 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -53,6 +53,8 @@ impl ConnectorCtx { } } +// Because it contains pointers we're going to do a copy by value on this one +#[derive(Clone, Copy)] pub(crate) struct SchedulerCtx<'a> { pub(crate) runtime: &'a RuntimeInner } @@ -62,10 +64,6 @@ pub(crate) struct Scheduler { scheduler_id: u32, } -// Thinking aloud: actual ports should be accessible by connector, but managed -// by the scheduler (to handle rerouting messages). We could just give a read- -// only context, instead of an extra call on the "Connector" trait. - impl Scheduler { pub fn new(runtime: Arc, scheduler_id: u32) -> Self { return Self{ runtime, scheduler_id }; @@ -144,9 +142,12 @@ impl Scheduler { } // Actually run the connector + println!("DEBUG [{}]: Running {} ...", scheduler_id, connector_key.index); + let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime }; let new_schedule = scheduled.connector.run( - &self.runtime.protocol_description, &scheduled.context, &mut delta_state + scheduler_ctx, &scheduled.context, &mut delta_state ); + println!("DEBUG [{}]: ... Finished running {}", scheduler_id, connector_key.index); // Handle all of the output from the current run: messages to // send and connectors to instantiate. @@ -185,9 +186,19 @@ impl Scheduler { } }, ConnectorScheduling::Exit => { - // TODO: Better way of doing this, when exiting then - // connected components must know their channels are invalid - self.runtime.destroy_component(connector_key); + // Prepare for exit. Set the shutdown flag and broadcast + // messages to notify peers of closing channels + scheduled.shutting_down = true; + for port in &scheduled.context.ports { + self.runtime.send_message(port.peer_connector, Message{ + sending_connector: connector_key.downcast(), + receiving_port: port.peer_id, + contents: MessageContents::Control(ControlMessage{ + id: 0, + content: ControlMessageVariant::Ack + }) + }) + } } } } @@ -221,7 +232,7 @@ impl Scheduler { receiving_port: PortIdLocal::new_invalid(), contents: MessageContents::ConfirmCommit(contents.clone()), }; - self.send_message_and_wake_up_if_sleeping(*to_visit, message); + self.runtime.send_message(*to_visit, message); } (ConnectorId::new_invalid(), PortIdLocal::new_invalid()) }, @@ -239,7 +250,7 @@ impl Scheduler { receiving_port: peer_port, contents: message, }; - self.send_message_and_wake_up_if_sleeping(peer_connector, message); + self.runtime.send_message(peer_connector, message); } } } @@ -264,12 +275,13 @@ impl Scheduler { // let the other end of the channel know that the port has // changed location. for port in &new_connector.context.ports { + cur_connector.pending_acks += 1; let reroute_message = cur_connector.router.prepare_reroute( port.self_id, port.peer_id, cur_connector.context.id, port.peer_connector, new_connector.context.id ); - self.send_message_and_wake_up_if_sleeping(port.peer_connector, reroute_message); + self.runtime.send_message(port.peer_connector, reroute_message); } // Schedule new connector to run @@ -281,39 +293,40 @@ impl Scheduler { debug_assert!(delta_state.new_ports.is_empty()); debug_assert!(delta_state.new_connectors.is_empty()); } +} - fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { - let connector = self.runtime.get_component_public(connector_id); +// ----------------------------------------------------------------------------- +// Control messages +// ----------------------------------------------------------------------------- - connector.inbox.insert_message(message); - let should_wake_up = connector.sleeping - .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) - .is_ok(); +struct ControlEntry { + id: u32, + variant: ControlVariant, +} - if should_wake_up { - let key = unsafe { ConnectorKey::from_id(connector_id) }; - self.runtime.push_work(key); - } - } +enum ControlVariant { + ChangedPort(ControlChangedPort), + ClosedChannel(ControlClosedChannel), } -/// Represents a rerouting entry due to a moved port -// TODO: Optimize -struct ReroutedTraffic { - id: u32, // ID of control message - target_port: PortIdLocal, // targeted port +struct ControlChangedPort { + target_port: PortIdLocal, // if send to this port, then reroute source_connector: ConnectorId, // connector we expect messages from - target_connector: ConnectorId, // connector they should be rerouted to + target_connector: ConnectorId, // connector we need to reroute to +} + +struct ControlClosedChannel { + } -pub(crate) struct Router { +pub(crate) struct ControlMessageHandler { id_counter: u32, - active: Vec, + active: Vec, } -impl Router { +impl ControlMessageHandler { pub fn new() -> Self { - Router{ + ControlMessageHandler { id_counter: 0, active: Vec::new(), } @@ -341,7 +354,7 @@ impl Router { return Message{ sending_connector: self_connector_id, - receiving_port: PortIdLocal::new_invalid(), + receiving_port: peer_port_id, contents: MessageContents::Control(ControlMessage{ id, content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id), diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 46dbf2abfc1db253f4f6414d548bba4bac9dae36..4be969ef20327dcbec9388bcd93cd991d53c5fe4 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -14,7 +14,7 @@ fn runtime_for(num_threads: u32, pdl: &str) -> Runtime { #[test] fn test_put_and_get() { - let rt = runtime_for(4, " + let rt = runtime_for(1, " primitive putter(out sender, u32 loops) { u32 index = 0; while (index < loops) {