Files
@ f450ae18ef58
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation
f450ae18ef58
2.5 KiB
application/rls-services+xml
merge with rewrite of connector/scheduler
f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 1aef293674a6 f450ae18ef58 1aef293674a6 1aef293674a6 f450ae18ef58 f450ae18ef58 f450ae18ef58 1aef293674a6 1aef293674a6 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 1aef293674a6 f450ae18ef58 f450ae18ef58 f450ae18ef58 1aef293674a6 1aef293674a6 | use std::sync::Arc;
use std::time::Duration;
use std::thread;
use crate::ProtocolDescription;
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
use super::global_store::GlobalStore;
struct Scheduler {
global: Arc<GlobalStore>,
code: Arc<ProtocolDescription>,
}
impl Scheduler {
pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
Self{
global,
code,
}
}
pub fn run(&mut self) {
// Setup global storage and workspaces that are reused for every
// connector that we run
// TODO: @Memory, scheme for reducing allocations if excessive.
let mut delta_state = RunDeltaState::new()
loop {
// TODO: Check if we're supposed to exit
// Retrieve a unit of work
let connector_key = self.global.pop_key();
if connector_key.is_none() {
// TODO: @Performance, needs condition variable for waking up
thread::sleep(Duration::new(1, 0));
continue
}
// We have something to do
let connector_key = connector_key.unwrap();
let connector = self.global.get_connector(&connector_key);
let mut cur_schedule = ConnectorScheduling::Immediate;
while cur_schedule == ConnectorScheduling::Immediate {
let new_schedule;
if connector.is_in_sync_mode() {
// In synchronous mode, so we can expect messages being sent,
// but we never expect the creation of connectors
new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
debug_assert!(delta_state.new_connectors.is_empty());
if !delta_state.outbox.is_empty() {}
} else {
// In regular running mode (not in a sync block) we cannot send
// messages but we can create new connectors
new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
debug_assert!(delta_state.outbox.is_empty());
if !delta_state.new_connectors.is_empty() {
// Push all connectors into the global state and queue them
// for execution
}
}
cur_schedule = new_schedule;
}
}
}
}
|