Files
@ ecc47971d535
Branch filter:
Location: CSY/reowolf/src/runtime2/native.rs
ecc47971d535
8.1 KiB
application/rls-services+xml
WIP on handling sync solution messages
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::Ordering;
use crate::protocol::ComponentCreationError;
use crate::protocol::eval::ValueGroup;
use super::{ConnectorKey, ConnectorId, RuntimeInner};
use super::scheduler::{SchedulerCtx, ComponentCtxFancy, ReceivedMessage};
use super::port::{Port, PortIdLocal, Channel, PortKind};
use super::connector::{Branch, ConnectorScheduling, ConnectorPDL};
use super::connector::find_ports_in_value_group;
use super::inbox::{Message, MessageContents};
/// Generic connector interface from the scheduler's point of view.
pub(crate) trait Connector {
/// Should run the connector's behaviour up until the next blocking point.
/// One should generally request and handle new messages from the component
/// context. Then perform any logic the component has to do, and in the
/// process perhaps queue up some state changes using the same context.
fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling;
}
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
enum ApplicationJob {
NewChannel((Port, Port)),
NewConnector(ConnectorPDL),
Shutdown,
}
/// The connector which an application can directly interface with. Once may set
/// up the next synchronous round, and retrieve the data afterwards.
pub struct ConnectorApplication {
sync_done: SyncDone,
job_queue: JobQueue,
}
impl ConnectorApplication {
pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
let connector = ConnectorApplication {
sync_done: sync_done.clone(),
job_queue: job_queue.clone()
};
let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
return (connector, interface);
}
}
impl Connector for ConnectorApplication {
fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling {
// Handle any incoming messages if we're participating in a round
while let Some(message) = comp_ctx.read_next_message() {
match message {
ReceivedMessage::Data(_) => todo!("data message in API connector"),
ReceivedMessage::Sync(_) | ReceivedMessage::RequestCommit(_) | ReceivedMessage::ConfirmCommit(_) => {
todo!("sync message in API connector");
}
}
}
// Handle requests coming from the API
{
let mut queue = self.job_queue.lock().unwrap();
while let Some(job) = queue.pop_front() {
match job {
ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
println!("DEBUG: API adopting ports");
comp_ctx.push_port(endpoint_a);
comp_ctx.push_port(endpoint_b);
}
ApplicationJob::NewConnector(connector) => {
println!("DEBUG: API creating connector");
comp_ctx.push_component(connector);
},
ApplicationJob::Shutdown => {
debug_assert!(queue.is_empty());
return ConnectorScheduling::Exit;
}
}
}
}
return ConnectorScheduling::NotNow;
}
}
/// The interface to a `ApplicationConnector`. This allows setting up the
/// interactions the `ApplicationConnector` performs within a synchronous round.
pub struct ApplicationInterface {
sync_done: SyncDone,
job_queue: JobQueue,
runtime: Arc<RuntimeInner>,
connector_id: ConnectorId,
owned_ports: Vec<PortIdLocal>,
}
impl ApplicationInterface {
fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
return Self{
sync_done, job_queue, runtime,
connector_id: ConnectorId::new_invalid(),
owned_ports: Vec::new(),
}
}
/// Creates a new channel.
pub fn create_channel(&mut self) -> Channel {
let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
debug_assert_eq!(getter_port.kind, PortKind::Getter);
let getter_id = getter_port.self_id;
let putter_id = putter_port.self_id;
{
let mut lock = self.job_queue.lock().unwrap();
lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
}
// Add to owned ports for error checking while creating a connector
self.owned_ports.reserve(2);
self.owned_ports.push(putter_id);
self.owned_ports.push(getter_id);
return Channel{ putter_id, getter_id };
}
/// Creates a new connector. Note that it is not scheduled immediately, but
/// depends on the `ApplicationConnector` to run, followed by the created
/// connector being scheduled.
// TODO: Yank out scheduler logic for common use.
pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
// Retrieve ports and make sure that we own the ones that are currently
// specified. This is also checked by the scheduler, but that is done
// asynchronously.
let mut initial_ports = Vec::new();
find_ports_in_value_group(&arguments, &mut initial_ports);
for port_to_remove in &initial_ports {
match self.owned_ports.iter().position(|v| v == port_to_remove) {
Some(index_to_remove) => {
// We own the port, so continue
self.owned_ports.remove(index_to_remove);
},
None => {
// We don't own the port
return Err(ComponentCreationError::UnownedPort);
}
}
}
let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
let connector = ConnectorPDL::new(Branch::new_initial_branch(state), initial_ports);
// Put on job queue
{
let mut queue = self.job_queue.lock().unwrap();
queue.push_back(ApplicationJob::NewConnector(connector));
}
self.wake_up_connector_with_ping();
return Ok(());
}
/// Check if the next sync-round is finished.
pub fn try_wait(&self) -> bool {
let (is_done, _) = &*self.sync_done;
let lock = is_done.lock().unwrap();
return *lock;
}
/// Wait until the next sync-round is finished
pub fn wait(&self) {
let (is_done, condition) = &*self.sync_done;
let lock = is_done.lock().unwrap();
condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done
}
/// Called by runtime to set associated connector's ID.
pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
self.connector_id = id;
}
fn wake_up_connector_with_ping(&self) {
let connector = self.runtime.get_component_public(self.connector_id);
connector.inbox.insert_message(Message{
sending_connector: ConnectorId::new_invalid(),
receiving_port: PortIdLocal::new_invalid(),
contents: MessageContents::Ping,
});
let should_wake_up = connector.sleeping
.compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
.is_ok();
if should_wake_up {
println!("DEBUG: Waking up connector");
let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
self.runtime.push_work(key);
} else {
println!("DEBUG: NOT waking up connector");
}
}
}
impl Drop for ApplicationInterface {
fn drop(&mut self) {
{
let mut lock = self.job_queue.lock().unwrap();
lock.push_back(ApplicationJob::Shutdown);
}
self.wake_up_connector_with_ping();
self.runtime.decrement_active_interfaces();
}
}
|