diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 8f2ac8ae030d9f193e325dd32274a788ef182fe0..fc11f600d1c486a1f99272569a0437e58c25fb67 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -88,6 +88,7 @@ impl ConnectorCtx { pub(crate) struct Scheduler { runtime: Arc, + scheduler_id: u32, } // Thinking aloud: actual ports should be accessible by connector, but managed @@ -95,32 +96,37 @@ pub(crate) struct Scheduler { // only context, instead of an extra call on the "Connector" trait. impl Scheduler { - pub fn new(runtime: Arc) -> Self { - return Self{ runtime }; + pub fn new(runtime: Arc, scheduler_id: u32) -> Self { + return Self{ runtime, scheduler_id }; } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run + let scheduler_id = self.scheduler_id; let mut delta_state = RunDeltaState::new(); 'thread_loop: loop { // Retrieve a unit of work - let connector_key = self.runtime.global_store.connector_queue.pop_front(); - if connector_key.is_none() { + let mut connector_key = self.runtime.global_store.connector_queue.pop_front(); + while connector_key.is_none() { // TODO: @Performance, needs condition or something, and most // def' not sleeping + println!("DEBUG [{}]: Nothing to do", scheduler_id); thread::sleep(Duration::new(1, 0)); if self.runtime.global_store.should_exit.load(Ordering::Acquire) { // Thread exits! + println!("DEBUG [{}]: ... So I am quitting", scheduler_id); break 'thread_loop; } + println!("DEBUG [{}]: ... But I'm still running", scheduler_id); continue 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); + println!("DEBUG [{}]: Running connector {}", scheduler_id, connector_key.index); let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key); // Keep running until we should no longer immediately schedule the @@ -168,7 +174,7 @@ impl Scheduler { } } else { // Let connector handle message - scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state); + scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state); } } @@ -349,7 +355,8 @@ impl Router { new_owner_connector_id: ConnectorId ) -> Message { let id = self.id_counter; - self.id_counter.overflowing_add(1); + let (new_id_counter, _) = self.id_counter.overflowing_add(1); + self.id_counter = new_id_counter; self.active.push(ReroutedTraffic{ id,