Files
@ daf15df0f8ca
Branch filter:
Location: CSY/reowolf/src/runtime2/scheduler.rs
daf15df0f8ca
12.4 KiB
application/rls-services+xml
scaffolding in place for scheduler/runtime
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 | use std::sync::Arc;
use std::sync::Condvar;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::thread;
use crate::ProtocolDescription;
use super::port::{PortIdLocal};
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
use super::connector::{Connector, ConnectorPublic, ConnectorScheduling, RunDeltaState};
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
pub(crate) struct Scheduler {
global: Arc<GlobalStore>,
code: Arc<ProtocolDescription>,
}
impl Scheduler {
pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
Self{
global,
code,
}
}
pub fn run(&mut self) {
// Setup global storage and workspaces that are reused for every
// connector that we run
let mut delta_state = RunDeltaState::new();
'thread_loop: loop {
// Retrieve a unit of work
let connector_key = self.global.connector_queue.pop_front();
if connector_key.is_none() {
// TODO: @Performance, needs condition or something, and most
// def' not sleeping
thread::sleep(Duration::new(1, 0));
if self.global.should_exit.load(Ordering::Acquire) {
// Thread exits!
break 'thread_loop;
}
continue 'thread_loop;
}
// We have something to do
let connector_key = connector_key.unwrap();
let scheduled = self.global.connectors.get_mut(&connector_key);
// Keep running until we should no longer immediately schedule the
// connector.
let mut cur_schedule = ConnectorScheduling::Immediate;
while cur_schedule == ConnectorScheduling::Immediate {
// Check all the message that are in the shared inbox
while let Some(message) = scheduled.public.inbox.take_message() {
match message {
Message::Data(message) => {
// Check if we need to reroute, or can just put it
// in the private inbox of the connector
if let Some(other_connector_id) = scheduled.router.should_reroute(&message) {
self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message));
} else {
scheduled.connector.inbox.insert_message(message);
}
},
Message::Control(message) => {
match message.content {
ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
// Need to change port target
let port = self.global.ports.get(&connector_key, port_id);
port.peer_connector = new_target_connector_id;
debug_assert!(delta_state.outbox.is_empty());
// And respond with an Ack
self.send_message_and_wake_up_if_sleeping(
message.sender,
Message::Control(ControlMessage{
id: message.id,
sender: connector_key.downcast(),
content: ControlMessageVariant::Ack,
})
);
},
ControlMessageVariant::Ack => {
scheduled.router.handle_ack(message.id);
}
}
}
}
}
// Actually run the connector
let new_schedule;
if scheduled.connector.is_in_sync_mode() {
// In synchronous mode, so we can expect messages being sent,
// but we never expect the creation of connectors
new_schedule = scheduled.connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
debug_assert!(delta_state.new_connectors.is_empty());
} else {
// In regular running mode (not in a sync block) we cannot send
// messages but we can create new connectors
new_schedule = scheduled.connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
debug_assert!(delta_state.outbox.is_empty());
}
// Handle all of the output from the current run: messages to
// send and connectors to instantiate.
self.handle_delta_state(&connector_key, &mut delta_state);
cur_schedule = new_schedule;
}
// If here then the connector does not require immediate execution.
// So enqueue it if requested, and otherwise put it in a sleeping
// state.
match cur_schedule {
ConnectorScheduling::Immediate => unreachable!(),
ConnectorScheduling::Later => {
// Simply queue it again later
self.global.connector_queue.push_back(connector_key);
},
ConnectorScheduling::NotNow => {
// Need to sleep, note that we are the only ones which are
// allows to set the sleeping state to `true`, and since
// we're running it must currently be `false`.
debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
scheduled.public.sleeping.store(true, Ordering::Release);
// We might have received a message in the meantime from a
// thread that did not see the sleeping flag set to `true`,
// so:
if !scheduled.public.inbox.is_empty() {
let should_reschedule_self = scheduled.public.sleeping
.compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
.is_ok();
if should_reschedule_self {
self.global.connector_queue.push_back(connector_key);
}
}
}
}
}
}
fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) {
// Handling any messages that were sent
if !delta_state.outbox.is_empty() {
for message in delta_state.outbox.drain(..) {
let (inbox_message, target_connector_id) = {
let sending_port = self.global.ports.get(&connector_key, message.sending_port);
(
DataMessage {
sending_connector: connector_key.downcast(),
sending_port: sending_port.self_id,
receiving_port: sending_port.peer_id,
sender_prev_branch_id: message.sender_prev_branch_id,
sender_cur_branch_id: message.sender_cur_branch_id,
message: message.message,
},
sending_port.peer_connector,
)
};
self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message));
}
}
// Handling any new connectors that were scheduled
// TODO: Pool outgoing messages to reduce atomic access
if !delta_state.new_connectors.is_empty() {
let cur_connector = self.global.connectors.get_mut(connector_key);
for new_connector in delta_state.new_connectors.drain(..) {
// Add to global registry to obtain key
let new_key = self.global.connectors.create(new_connector);
let new_connector = self.global.connectors.get_mut(&new_key);
// Each port should be lost by the connector that created the
// new one. Note that the creator is the current owner.
for port_id in &new_connector.ports.owned_ports {
debug_assert!(!cur_connector.ports.owned_ports.contains(port_id));
// Modify ownership, retrieve peer connector
let (peer_connector_id, peer_port_id) = {
let mut port = self.global.ports.get(connector_key, *port_id);
port.owning_connector = new_key.downcast();
(port.peer_connector, port.peer_id)
};
// Send message that port has changed ownership
let reroute_message = cur_connector.router.prepare_reroute(
port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast()
);
self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
}
// Schedule new connector to run
self.global.connector_queue.push_back(new_key);
}
}
}
pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
let connector = self.global.connectors.get_shared(connector_id);
connector.inbox.insert_message(message);
let should_wake_up = connector.sleeping
.compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
.is_ok();
if should_wake_up {
let key = unsafe { ConnectorKey::from_id(connector_id) };
self.global.connector_queue.push_back(key);
}
}
}
/// Represents a rerouting entry due to a moved port
// TODO: Optimize
struct ReroutedTraffic {
id: u32, // ID of control message
port: PortIdLocal, // targeted port
source_connector: ConnectorId, // connector we expect messages from
target_connector: ConnectorId, // connector they should be rerouted to
}
pub(crate) struct Router {
id_counter: u32,
active: Vec<ReroutedTraffic>,
}
impl Router {
pub fn new() -> Self {
Router{
id_counter: 0,
active: Vec::new(),
}
}
/// Prepares rerouting messages due to changed ownership of a port. The
/// control message returned by this function must be sent to the
/// transferred port's peer connector.
pub fn prepare_reroute(
&mut self,
port_id: PortIdLocal, peer_port_id: PortIdLocal,
self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
new_owner_connector_id: ConnectorId
) -> Message {
let id = self.id_counter;
self.id_counter.overflowing_add(1);
self.active.push(ReroutedTraffic{
id,
port: port_id,
source_connector: peer_connector_id,
target_connector: new_owner_connector_id,
});
return Message::Control(ControlMessage{
id,
sender: self_connector_id,
content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
});
}
/// Returns true if the supplied message should be rerouted. If so then this
/// function returns the connector that should retrieve this message.
pub fn should_reroute(&self, message: &DataMessage) -> Option<ConnectorId> {
for reroute in &self.active {
if reroute.source_connector == message.sending_connector &&
reroute.port == message.sending_port {
// Need to reroute this message
return Some(reroute.target_connector);
}
}
return None;
}
/// Handles an Ack as an answer to a previously sent control message
pub fn handle_ack(&mut self, id: u32) {
let index = self.active.iter()
.position(|v| v.id == id);
match index {
Some(index) => { self.active.remove(index); },
None => { todo!("handling of nefarious ACKs"); },
}
}
}
|