Files
@ cf26538b25dc
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs - annotation
cf26538b25dc
4.4 KiB
application/rls-services+xml
architecture for send/recv ports in place
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 1aef293674a6 cf26538b25dc f450ae18ef58 1aef293674a6 1aef293674a6 f450ae18ef58 f450ae18ef58 f450ae18ef58 1aef293674a6 1aef293674a6 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 cf26538b25dc f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 cf26538b25dc f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 cf26538b25dc f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 cf26538b25dc cf26538b25dc f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 8c5d438b0fa3 cf26538b25dc 8c5d438b0fa3 cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc 8c5d438b0fa3 cf26538b25dc 8c5d438b0fa3 8c5d438b0fa3 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 f450ae18ef58 cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc cf26538b25dc f450ae18ef58 f450ae18ef58 1aef293674a6 f450ae18ef58 f450ae18ef58 f450ae18ef58 1aef293674a6 1aef293674a6 | use std::sync::Arc;
use std::time::Duration;
use std::thread;
use crate::ProtocolDescription;
use super::inbox::InboxMessage;
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
use super::global_store::GlobalStore;
struct Scheduler {
global: Arc<GlobalStore>,
code: Arc<ProtocolDescription>,
}
impl Scheduler {
pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
Self{
global,
code,
}
}
pub fn run(&mut self) {
// Setup global storage and workspaces that are reused for every
// connector that we run
// TODO: @Memory, scheme for reducing allocations if excessive.
let mut delta_state = RunDeltaState::new();
loop {
// TODO: Check if we're supposed to exit
// Retrieve a unit of work
let connector_key = self.global.connector_queue.pop_front();
if connector_key.is_none() {
// TODO: @Performance, needs condition variable for waking up
thread::sleep(Duration::new(1, 0));
continue
}
// We have something to do
let connector_key = connector_key.unwrap();
let connector = self.global.connectors.get_mut(&connector_key);
let mut cur_schedule = ConnectorScheduling::Immediate;
while cur_schedule == ConnectorScheduling::Immediate {
let new_schedule;
// TODO: Check inbox for new message
if connector.is_in_sync_mode() {
// In synchronous mode, so we can expect messages being sent,
// but we never expect the creation of connectors
new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
debug_assert!(delta_state.new_connectors.is_empty());
if !delta_state.outbox.is_empty() {
// There are message to send
for message in delta_state.outbox.drain(..) {
let (inbox_message, target_connector_id) = {
// Note: retrieving a port incurs a read lock
let sending_port = self.global.ports.get(&connector_key, message.sending_port);
(
InboxMessage {
sending_port: sending_port.self_id,
receiving_port: sending_port.peer_id,
sender_prev_branch_id: message.sender_prev_branch_id,
sender_cur_branch_id: message.sender_cur_branch_id,
message: message.message,
},
sending_port.peer_connector,
)
};
let target_connector = self.global.connectors.get_shared(target_connector_id);
target_connector.inbox.insert_message(inbox_message);
// TODO: Check silent state. Queue connector if it was silent
}
}
} else {
// In regular running mode (not in a sync block) we cannot send
// messages but we can create new connectors
new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
debug_assert!(delta_state.outbox.is_empty());
if !delta_state.new_connectors.is_empty() {
// Push all connectors into the global state and queue them
// for execution
for connector in delta_state.new_connectors.drain(..) {
// Create connector, modify all of the ports that
// it now owns, then queue it for execution
let connector_key = self.global.connectors.create(connector);
self.global.connector_queue.push_back(connector_key);
}
}
}
cur_schedule = new_schedule;
}
}
}
}
|