Files
@ cf26538b25dc
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs
cf26538b25dc
4.4 KiB
application/rls-services+xml
architecture for send/recv ports in place
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | use std::sync::Arc;
use std::time::Duration;
use std::thread;
use crate::ProtocolDescription;
use super::inbox::InboxMessage;
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
use super::global_store::GlobalStore;
struct Scheduler {
global: Arc<GlobalStore>,
code: Arc<ProtocolDescription>,
}
impl Scheduler {
pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
Self{
global,
code,
}
}
pub fn run(&mut self) {
// Setup global storage and workspaces that are reused for every
// connector that we run
// TODO: @Memory, scheme for reducing allocations if excessive.
let mut delta_state = RunDeltaState::new();
loop {
// TODO: Check if we're supposed to exit
// Retrieve a unit of work
let connector_key = self.global.connector_queue.pop_front();
if connector_key.is_none() {
// TODO: @Performance, needs condition variable for waking up
thread::sleep(Duration::new(1, 0));
continue
}
// We have something to do
let connector_key = connector_key.unwrap();
let connector = self.global.connectors.get_mut(&connector_key);
let mut cur_schedule = ConnectorScheduling::Immediate;
while cur_schedule == ConnectorScheduling::Immediate {
let new_schedule;
// TODO: Check inbox for new message
if connector.is_in_sync_mode() {
// In synchronous mode, so we can expect messages being sent,
// but we never expect the creation of connectors
new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
debug_assert!(delta_state.new_connectors.is_empty());
if !delta_state.outbox.is_empty() {
// There are message to send
for message in delta_state.outbox.drain(..) {
let (inbox_message, target_connector_id) = {
// Note: retrieving a port incurs a read lock
let sending_port = self.global.ports.get(&connector_key, message.sending_port);
(
InboxMessage {
sending_port: sending_port.self_id,
receiving_port: sending_port.peer_id,
sender_prev_branch_id: message.sender_prev_branch_id,
sender_cur_branch_id: message.sender_cur_branch_id,
message: message.message,
},
sending_port.peer_connector,
)
};
let target_connector = self.global.connectors.get_shared(target_connector_id);
target_connector.inbox.insert_message(inbox_message);
// TODO: Check silent state. Queue connector if it was silent
}
}
} else {
// In regular running mode (not in a sync block) we cannot send
// messages but we can create new connectors
new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
debug_assert!(delta_state.outbox.is_empty());
if !delta_state.new_connectors.is_empty() {
// Push all connectors into the global state and queue them
// for execution
for connector in delta_state.new_connectors.drain(..) {
// Create connector, modify all of the ports that
// it now owns, then queue it for execution
let connector_key = self.global.connectors.create(connector);
self.global.connector_queue.push_back(connector_key);
}
}
}
cur_schedule = new_schedule;
}
}
}
}
|