Files
@ 58dfabd1be9f
Branch filter:
Location: CSY/reowolf/src/runtime2/native.rs
58dfabd1be9f
6.1 KiB
application/rls-services+xml
moving to laptop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | use std::sync::{Arc, Mutex, Condvar};
use std::cell::Cell;
use std::sync::atomic::Ordering;
use crate::protocol::ComponentCreationError;
use crate::protocol::eval::ValueGroup;
use crate::ProtocolDescription;
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
use crate::runtime2::global_store::ConnectorKey;
use super::RuntimeInner;
use super::global_store::{ConnectorVariant, ConnectorId};
use super::port::{Channel, PortIdLocal};
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
use super::inbox::{Message, DataMessage, SyncMessage};
pub trait Connector {
fn insert_data_message(&mut self, message: DataMessage);
fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState);
fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
}
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>,
enum ApplicationJob {
NewConnector(ConnectorPDL),
}
/// The connector which an application can directly interface with. Once may set
/// up the next synchronous round, and retrieve the data afterwards.
pub struct ConnectorApplication {
sync_done: SyncDone,
job_queue: JobQueue,
}
impl ConnectorApplication {
pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32)));
let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
return (connector, interface);
}
}
impl Connector for ConnectorApplication {
fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState) {
todo!("handling sync messages in ApplicationConnector");
}
fn insert_data_message(&mut self, message: DataMessage) {
todo!("handling messages in ApplicationConnector");
}
fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
let mut queue = self.job_queue.lock().unwrap();
while let Some(job) = queue.pop() {
match job {
ApplicationJob::NewConnector(connector) => {
delta_state.new_connectors.push(connector);
}
}
}
return ConnectorScheduling::NotNow;
}
}
/// The interface to a `ApplicationConnector`. This allows setting up the
/// interactions the `ApplicationConnector` performs within a synchronous round.
pub struct ApplicationInterface {
sync_done: SyncDone,
job_queue: JobQueue,
runtime: Arc<RuntimeInner>,
connector_id: ConnectorId,
owned_ports: Vec<PortIdLocal>,
}
impl ApplicationInterface {
pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner1>) -> Self {
return Self{
sync_done, job_queue, runtime,
connector_id: ConnectorId::new_invalid(),
owned_ports: Vec::new(),
}
}
/// Creates a new channel.
pub fn create_channel(&mut self) -> Channel {
let channel = self.runtime.global_store.ports.create_channel(self.connector_id);
self.owned_ports.push(channel.putter_id);
self.owned_ports.push(channel.getter_id);
return channel;
}
/// Creates a new connector. Note that it is not scheduled immediately, but
/// depends on the `ApplicationConnector` to run, followed by the created
/// connector being scheduled.
// TODO: Optimize by yanking out scheduler logic for common use.
pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
// Retrieve ports and make sure that we own the ones that are currently
// specified. This is also checked by the scheduler, but that is done
// asynchronously.
let mut initial_ports = Vec::new();
find_ports_in_value_group(&arguments, &mut initial_ports);
for port_to_remove in &initial_ports {
match self.owned_ports.iter().position(|v| v == port_to_remove) {
Some(index_to_remove) => {
// We own the port, so continue
self.owned_ports.remove(index_to_remove)
},
None => {
// We don't own the port
return Err(ComponentCreationError::UnownedPort);
}
}
}
let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
let connector = ConnectorPDL::new(0, Branch::new_initial_branch(state), initial_ports);
// Put on job queue
{
let mut queue = self.job_queue.lock().unwrap();
queue.push(ApplicationJob::NewConnector(connector));
}
// Send ping message to wake up connector
let connector = self.runtime.global_store.connectors.get_shared(self.connector_id);
connector.inbox.insert_message(Message::Ping);
let should_wake_up = connector.sleeping
.compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
.is_ok();
if should_wake_up {
let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
self.runtime.global_store.connector_queue.push_back(key);
}
return Ok(());
}
/// Check if the next sync-round is finished.
pub fn try_wait(&self) -> bool {
let (is_done, _) = &*self.sync_done;
let lock = is_done.lock().unwrap();
return *lock;
}
/// Wait until the next sync-round is finished
pub fn wait(&self) {
let (is_done, condition) = &*self.sync_done;
let lock = is_done.lock().unwrap();
condition.wait_while(lock, |v| !*v); // wait while not done
}
/// Called by runtime to set associated connector's ID.
pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
self.connector_id = id;
}
}
|