use std::sync::Arc; use std::time::Duration; use std::thread; use crate::ProtocolDescription; use super::inbox::InboxMessage; use super::connector::{Connector, ConnectorScheduling, RunDeltaState}; use super::global_store::GlobalStore; struct Scheduler { global: Arc, code: Arc, } impl Scheduler { pub fn new(global: Arc, code: Arc) -> Self { Self{ global, code, } } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run // TODO: @Memory, scheme for reducing allocations if excessive. let mut delta_state = RunDeltaState::new(); loop { // TODO: Check if we're supposed to exit // Retrieve a unit of work let connector_key = self.global.connector_queue.pop_front(); if connector_key.is_none() { // TODO: @Performance, needs condition variable for waking up thread::sleep(Duration::new(1, 0)); continue } // We have something to do let connector_key = connector_key.unwrap(); let connector = self.global.connectors.get_mut(&connector_key); let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { let new_schedule; // TODO: Check inbox for new message if connector.is_in_sync_mode() { // In synchronous mode, so we can expect messages being sent, // but we never expect the creation of connectors new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); debug_assert!(delta_state.new_connectors.is_empty()); if !delta_state.outbox.is_empty() { // There are message to send for message in delta_state.outbox.drain(..) { let (inbox_message, target_connector_id) = { // Note: retrieving a port incurs a read lock let sending_port = self.global.ports.get(&connector_key, message.sending_port); ( InboxMessage { sending_port: sending_port.self_id, receiving_port: sending_port.peer_id, sender_prev_branch_id: message.sender_prev_branch_id, sender_cur_branch_id: message.sender_cur_branch_id, message: message.message, }, sending_port.peer_connector, ) }; let target_connector = self.global.connectors.get_shared(target_connector_id); target_connector.inbox.insert_message(inbox_message); // TODO: Check silent state. Queue connector if it was silent } } } else { // In regular running mode (not in a sync block) we cannot send // messages but we can create new connectors new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state); debug_assert!(delta_state.outbox.is_empty()); if !delta_state.new_connectors.is_empty() { // Push all connectors into the global state and queue them // for execution for connector in delta_state.new_connectors.drain(..) { // Create connector, modify all of the ports that // it now owns, then queue it for execution let connector_key = self.global.connectors.create(connector); self.global.connector_queue.push_back(connector_key); } } } cur_schedule = new_schedule; } } } }