use std::sync::Arc; use std::time::Duration; use std::thread; use crate::ProtocolDescription; use super::connector::{Connector, ConnectorScheduling, RunDeltaState}; use super::global_store::GlobalStore; struct Scheduler { global: Arc, code: Arc, } impl Scheduler { pub fn new(global: Arc, code: Arc) -> Self { Self{ global, code, } } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run // TODO: @Memory, scheme for reducing allocations if excessive. let mut delta_state = RunDeltaState::new() loop { // TODO: Check if we're supposed to exit // Retrieve a unit of work let connector_key = self.global.pop_key(); if connector_key.is_none() { // TODO: @Performance, needs condition variable for waking up thread::sleep(Duration::new(1, 0)); continue } // We have something to do let connector_key = connector_key.unwrap(); let connector = self.global.get_connector(&connector_key); let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { let new_schedule; if connector.is_in_sync_mode() { // In synchronous mode, so we can expect messages being sent, // but we never expect the creation of connectors new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); debug_assert!(delta_state.new_connectors.is_empty()); if !delta_state.outbox.is_empty() {} } else { // In regular running mode (not in a sync block) we cannot send // messages but we can create new connectors new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state); debug_assert!(delta_state.outbox.is_empty()); if !delta_state.new_connectors.is_empty() { // Push all connectors into the global state and queue them // for execution } } cur_schedule = new_schedule; } } } }