From 58dfabd1be9f1fd4df1289d1f2f40c97deaa5a3e 2021-10-14 00:23:18 From: MH Date: 2021-10-14 00:23:18 Subject: [PATCH] moving to laptop --- diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index f8fa2a8c7997917373315e46cdab473febf83d45..af9724fc4040d2aae15a500acdfcc49b4f73c19b 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -51,6 +51,7 @@ pub enum ComponentCreationError { DefinitionNotComponent, InvalidNumArguments, InvalidArgumentType(usize), + UnownedPort, } impl std::fmt::Debug for ProtocolDescription { diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index b6c1c4b3d0221c6223fe21e9be93bfde25cd7a9e..a86101c3ad92818e07791e2e9ef9a23829db3e4f 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage}; +use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage}; use crate::runtime2::port::PortIdLocal; /// Represents the identifier of a branch (the index within its container). An @@ -52,7 +52,7 @@ pub(crate) struct Branch { sync_state: SpeculativeState, next_branch_in_queue: Option, // Message/port state - inbox: HashMap, // TODO: @temporary, remove together with fires() + received: HashMap, // TODO: @temporary, remove together with fires() ports_delta: Vec, } @@ -66,7 +66,7 @@ impl Branch { code_state: component_state, sync_state: SpeculativeState::RunningNonSync, next_branch_in_queue: None, - inbox: HashMap::new(), + received: HashMap::new(), ports_delta: Vec::new(), } } @@ -85,7 +85,7 @@ impl Branch { code_state: parent_branch.code_state.clone(), sync_state: SpeculativeState::RunningInSync, next_branch_in_queue: None, - inbox: parent_branch.inbox.clone(), + received: parent_branch.received.clone(), ports_delta: parent_branch.ports_delta.clone(), } } @@ -264,7 +264,7 @@ impl ConnectorPublic { // TODO: Maybe prevent false sharing by aligning `public` to next cache line. // TODO: Do this outside of the connector, create a wrapping struct -pub(crate) struct Connector { +pub(crate) struct ConnectorPDL { // State and properties of connector itself id: u32, in_sync: bool, @@ -273,6 +273,7 @@ pub(crate) struct Connector { sync_active: BranchQueue, sync_pending_get: BranchQueue, sync_finished: BranchQueue, + sync_finished_last_handled: u32, // Port/message management pub inbox: PrivateInbox, pub ports: ConnectorPorts, @@ -297,7 +298,7 @@ impl RunContext for TempCtx { } } -impl Connector { +impl ConnectorPDL { /// Constructs a representation of a connector. The assumption is that the /// initial branch is at the first instruction of the connector's code, /// hence is in a non-sync state. @@ -309,6 +310,7 @@ impl Connector { sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), + sync_finished_last_handled: 0, // none at all inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), } @@ -318,6 +320,34 @@ impl Connector { return self.in_sync; } + pub fn insert_sync_message(&mut self, message: SyncMessage, results: &mut RunDeltaState) { + + } + + pub fn run(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling { + if self.in_sync { + let scheduling = self.run_in_speculative_mode(pd, results); + + // When in speculative mode we might have generated new sync + // solutions, we need to turn them into proposed solutions here. + if self.sync_finished_last_handled != self.sync_finished.last { + let mut next_id; + if self.sync_finished_last_handled == 0 { + next_id = self.sync_finished.first; + } else { + let last_handled = &self.branches[self.sync_finished_last_handled as usize]; + debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" + next_id = last_handled.next_branch_in_queue.unwrap(); + } + + // Transform branch into proposed global solution + } + } else { + let scheduling = self.run_in_deterministic_mode(pd, results); + return scheduling; + } + } + /// Runs the connector in synchronous mode. Potential changes to the global /// system's state are added to the `RunDeltaState` object by the connector, /// where it is the caller's responsibility to immediately take care of @@ -400,29 +430,38 @@ impl Connector { // But if some messages can be immediately applied, do so // now. let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id); - if !messages.is_empty() { - // TODO: If message contains ports, transfer ownership of port. - for message in messages { - // For each message, for the execution and feed it - // the provided message - let new_branch_index = self.branches.len() as u32; - let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch); - self.ports.prepare_sync_branch(branch.index.index, new_branch_index); - - let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); - port_mapping.last_registered_branch_id = message.sender_cur_branch_id; - debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1); - - new_branch.inbox.insert(local_port_id, message.clone()); - - // Schedule the new branch - debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index); - self.branches.push(new_branch); - } + let mut did_have_messages = false; + + for message in messages { + did_have_messages = true; + + // For each message prepare a new branch to execute + let new_branch_index = self.branches.len() as u32; + let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch); + self.ports.prepare_sync_branch(branch.index.index, new_branch_index); + + let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); + port_mapping.last_registered_branch_id = message.sender_cur_branch_id; + debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1); + + new_branch.received.insert(local_port_id, message.clone()); + + // If the message contains any ports then they will now + // be owned by the new branch + debug_assert!(results.ports.is_empty()); + find_ports_in_value_group(&message.message, &mut results.ports); + Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports); + results.ports.clear(); - // Because we have new branches to run, schedule - // immediately + // Schedule the new branch + debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index); + self.branches.push(new_branch); + } + + if did_have_messages { + // If we did create any new branches, then we can run + // them immediately. return ConnectorScheduling::Immediate; } } else { @@ -431,7 +470,7 @@ impl Connector { }, RunResult::BranchAtSyncEnd => { // Branch is done, go through all of the ports that are not yet - // assigned and modify them to be + // assigned and map them to non-firing. for port_idx in 0..self.ports.num_ports() { let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx); if !port_mapping.is_assigned { @@ -480,6 +519,13 @@ impl Connector { message: value_group, }; + // If the message contains any ports then we release our + // ownership over them in this branch + debug_assert!(results.ports.is_empty()); + find_ports_in_value_group(&message.message, &mut results.ports); + Self::release_ports_during_sync(&mut self.ports, branch, &results.ports); + results.ports.clear(); + results.outbox.push(message); return ConnectorScheduling::Immediate } else { @@ -545,7 +591,7 @@ impl Connector { }; let new_connector_ports = results.ports.clone(); // TODO: Do something with this let new_connector_branch = Branch::new_initial_branch(new_connector_state); - let new_connector = Connector::new(0, new_connector_branch, new_connector_ports); + let new_connector = ConnectorPDL::new(0, new_connector_branch, new_connector_ports); results.new_connectors.push(new_connector); @@ -586,7 +632,9 @@ impl Connector { } #[inline] - fn push_branch_into_queue(branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId) { + fn push_branch_into_queue( + branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId, + ) { debug_assert!(to_push.is_valid()); let to_push = to_push.index; @@ -702,6 +750,11 @@ impl Connector { return Ok(()) } + + // Helpers for generating and handling sync messages (and the solutions that + // are described by those sync messages) + + fn generate_initial_solution_for_branch(&self, branch_id: BranchId,) } /// A data structure passed to a connector whose code is being executed that is @@ -712,7 +765,7 @@ pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. pub outbox: Vec, - pub new_connectors: Vec, + pub new_connectors: Vec, // Workspaces pub ports: Vec, } diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index 96a30954aeedf71e9f3684d89b0cef2550aff4b1..e71494f58f910e269179d5e47fbdfd28cffca18b 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -1,6 +1,6 @@ use crate::collections::{MpmcQueue, RawVec}; -use super::connector::{Connector, ConnectorPublic}; +use super::connector::{ConnectorPDL, ConnectorPublic}; use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; use super::inbox::PublicInbox; use super::scheduler::Router; @@ -8,6 +8,7 @@ use super::scheduler::Router; use std::ptr; use std::sync::{Barrier, RwLock, RwLockReadGuard}; use std::sync::atomic::AtomicBool; +use crate::runtime2::native::Connector; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this @@ -45,8 +46,15 @@ impl ConnectorId { } } +// TODO: Change this, I hate this. But I also don't want to put `public` and +// `router` of `ScheduledConnector` back into `Connector`. +pub enum ConnectorVariant { + UserDefined(ConnectorPDL), + Native(Box), +} + pub struct ScheduledConnector { - pub connector: Connector, + pub connector: ConnectorVariant, pub public: ConnectorPublic, pub router: Router } @@ -100,7 +108,7 @@ impl ConnectorStore { /// Create a new connector, returning the key that can be used to retrieve /// and/or queue it. - pub(crate) fn create(&self, connector: Connector) -> ConnectorKey { + pub(crate) fn create(&self, connector: ConnectorVariant) -> ConnectorKey { let lock = self.inner.write().unwrap(); let connector = ScheduledConnector{ connector, @@ -189,17 +197,12 @@ impl PortStore { } } - pub(crate) fn create_channel(&self, creating_connector: Option) -> Channel { + pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> Channel { let mut lock = self.inner.write().unwrap(); // Reserves a new port. Doesn't point it to its counterpart - fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option) -> u32 { + fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: ConnectorId) -> u32 { let index; - let (ownership, connector_id) = if creating_connector.is_some() { - (PortOwnership::Owned, creating_connector.unwrap()) - } else { - (PortOwnership::Unowned, ConnectorId::new_invalid()) - }; if lock.free.is_empty() { index = lock.ports.len() as u32; @@ -207,7 +210,7 @@ impl PortStore { self_id: PortIdLocal::new(index), peer_id: PortIdLocal::new_invalid(), kind, - ownership, + ownership: PortOwnership::Owned, owning_connector: connector_id, peer_connector: connector_id }); @@ -217,7 +220,7 @@ impl PortStore { port.peer_id = PortIdLocal::new_invalid(); port.kind = kind; - port.ownership = ownership; + port.ownership = PortOwnership::Owned; port.owning_connector = connector_id; port.peer_connector = connector_id; } @@ -238,7 +241,10 @@ impl PortStore { getter_port.peer_id = putter_port.self_id; } - return Channel{ putter_id, getter_id } + return Channel{ + putter_id: PortIdLocal::new(putter_id), + getter_id: PortIdLocal::new(getter_id), + } } } diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 2d64dd23404f8dea01d3441b5adc99902eaba789..022ab16496f3aa2ab43511267f73f022916e8654 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -42,6 +42,29 @@ pub struct DataMessage { pub message: ValueGroup, } +pub enum SyncBranchConstraint { + SilentPort(PortIdLocal), + BranchNumber(u32), + PortMapping(PortIdLocal, u32), +} + +pub struct SyncConnectorSolution { + connector_id: ConnectorId, + terminating_branch_id: BranchId, + execution_branch_ids: Vec, // ends with terminating branch ID +} + +pub struct SyncConnectorConstraints { + connector_id: ConnectorId, + constraints: Vec, +} + +pub struct SyncMessage { + connector_solutions: Vec, + connector_constraints: Vec, + connectors_to_visit: Vec, +} + /// A control message. These might be sent by the scheduler to notify eachother /// of asynchronous state changes. pub struct ControlMessage { @@ -59,8 +82,10 @@ pub enum ControlMessageVariant { /// out and handles all control message and potential routing). The correctly /// addressed `Data` variants will end up at the connector. pub enum Message { - Data(DataMessage), - Control(ControlMessage), + Data(DataMessage), // data message, handled by connector + Sync(SyncMessage), // sync message, handled by both connector/scheduler + Control(ControlMessage), // control message, handled by scheduler + Ping, // ping message, intentionally waking up a connector (used for native connectors) } /// The public inbox of a connector. The thread running the connector that owns diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index b75ec01ec78f0103f2fa427197c12a89cca226f0..8ac3156cbb45147d161cbe4457f58ea181a64fe1 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -3,6 +3,7 @@ mod runtime; mod messages; mod connector; +mod native; mod port; mod global_store; mod scheduler; @@ -12,87 +13,86 @@ mod inbox; // Imports -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::Ordering; use std::thread::{self, JoinHandle}; use crate::protocol::eval::*; use crate::{common::Id, PortId, ProtocolDescription}; -use global_store::GlobalStore; +use global_store::{ConnectorVariant, GlobalStore}; use scheduler::Scheduler; use crate::protocol::ComponentCreationError; -use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group}; +use connector::{Branch, ConnectorPDL, find_ports_in_value_group}; +use native::{ConnectorApplication, ApplicationInterface}; // Runtime API +// TODO: Exit condition is very dirty. Take into account: +// - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working +// - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done +// - User-owned interfaces: As long as these are owned user may still decide to create new connectors. pub struct Runtime { - global_store: Arc, - protocol_description: Arc, - schedulers: Vec> + inner: Arc, +} + +pub(crate) struct RuntimeInner { + pub(crate) global_store: GlobalStore, + pub(crate) protocol_description: ProtocolDescription, + schedulers: Mutex>>, // TODO: Revise, make exit condition something like: all interfaces dropped } impl Runtime { - pub fn new(num_threads: usize, protocol_description: Arc) -> Runtime { + pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); - let global_store = Arc::new(GlobalStore::new()); + let runtime_inner = Arc::new(RuntimeInner{ + global_store: GlobalStore::new(), + protocol_description, + schedulers: Mutex::new(Vec::new()), + }); // Launch threads - let mut schedulers = Vec::with_capacity(num_threads); - for _ in 0..num_threads { - let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone()); - let thread = thread::spawn(move || { - scheduler.run(); - }); - - schedulers.push(thread); + { + let mut schedulers = Vec::with_capacity(num_threads); + for _ in 0..num_threads { + let mut scheduler = Scheduler::new(runtime_inner.clone()); + let thread = thread::spawn(move || { + scheduler.run(); + }); + + schedulers.push(thread); + } + + let mut lock = runtime_inner.schedulers.lock().unwrap(); + *lock = schedulers; } - // Move innards into runtime struct - return Runtime{ - global_store, - protocol_description, - schedulers, - } + // Return runtime + return Runtime{ inner: runtime_inner }; } - /// Returns (putter port, getter port) - pub fn create_channel(&self) -> (Value, Value) { - let channel = self.global_store.ports.create_channel(None); - let putter_value = Value::Output(PortId(Id{ - connector_id: u32::MAX, - u32_suffix: channel.putter_id, - })); - let getter_value = Value::Input(PortId(Id{ - connector_id: u32::MAX, - u32_suffix: channel.getter_id, - })); - return (putter_value, getter_value); - } + /// Returns a new interface through which channels and connectors can be + /// created. + pub fn create_interface(&self) -> ApplicationInterface { + let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); + let connector = Box::new(connector); - pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> { - // TODO: Remove component creation function from PD, should not be concerned with it - // Create the connector and mark the ports as now owned by the - // connector - let mut port_ids = Vec::new(); - find_ports_in_value_group(&values, &mut port_ids); - - let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?; - let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone()); - let connector_key = self.global_store.connectors.create(connector); - - for port_id in port_ids { - let port = self.global_store.ports.get(&connector_key, port_id); - port.owning_connector = connector_key.downcast(); - port.peer_connector - // TODO: Note that we immediately need to notify the other side of the connector that - // the port has moved! - } + let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector)); + interface.set_connector_id(connector_key.downcast()); + + // Note that we're not scheduling. That is done by the interface in case + // it is actually needed. + return interface; } } impl Drop for Runtime { fn drop(&mut self) { - + self.inner.global_store.should_exit.store(true, Ordering::Release); + let mut schedulers = self.inner.schedulers.lock().unwrap(); + for scheduler in schedulers.drain(..) { + scheduler.join(); + } } } \ No newline at end of file diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs new file mode 100644 index 0000000000000000000000000000000000000000..82b5de08d295ab4ffa43ff00c86d67787eb17d87 --- /dev/null +++ b/src/runtime2/native.rs @@ -0,0 +1,165 @@ +use std::sync::{Arc, Mutex, Condvar}; +use std::cell::Cell; +use std::sync::atomic::Ordering; +use crate::protocol::ComponentCreationError; + +use crate::protocol::eval::ValueGroup; +use crate::ProtocolDescription; +use crate::runtime2::connector::{Branch, find_ports_in_value_group}; +use crate::runtime2::global_store::ConnectorKey; + +use super::RuntimeInner; +use super::global_store::{ConnectorVariant, ConnectorId}; +use super::port::{Channel, PortIdLocal}; +use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; +use super::inbox::{Message, DataMessage, SyncMessage}; + +pub trait Connector { + fn insert_data_message(&mut self, message: DataMessage); + fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState); + fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling; +} + +type SyncDone = Arc<(Mutex, Condvar)>; +type JobQueue = Arc>>, + +enum ApplicationJob { + NewConnector(ConnectorPDL), +} + +/// The connector which an application can directly interface with. Once may set +/// up the next synchronous round, and retrieve the data afterwards. +pub struct ConnectorApplication { + sync_done: SyncDone, + job_queue: JobQueue, +} + +impl ConnectorApplication { + pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { + let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); + let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32))); + + let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() }; + let interface = ApplicationInterface::new(sync_done, job_queue, runtime); + + return (connector, interface); + } +} + +impl Connector for ConnectorApplication { + fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState) { + todo!("handling sync messages in ApplicationConnector"); + } + + fn insert_data_message(&mut self, message: DataMessage) { + todo!("handling messages in ApplicationConnector"); + } + + fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + let mut queue = self.job_queue.lock().unwrap(); + while let Some(job) = queue.pop() { + match job { + ApplicationJob::NewConnector(connector) => { + delta_state.new_connectors.push(connector); + } + } + } + + return ConnectorScheduling::NotNow; + } +} + +/// The interface to a `ApplicationConnector`. This allows setting up the +/// interactions the `ApplicationConnector` performs within a synchronous round. +pub struct ApplicationInterface { + sync_done: SyncDone, + job_queue: JobQueue, + runtime: Arc, + connector_id: ConnectorId, + owned_ports: Vec, +} + +impl ApplicationInterface { + pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { + return Self{ + sync_done, job_queue, runtime, + connector_id: ConnectorId::new_invalid(), + owned_ports: Vec::new(), + } + } + + /// Creates a new channel. + pub fn create_channel(&mut self) -> Channel { + let channel = self.runtime.global_store.ports.create_channel(self.connector_id); + self.owned_ports.push(channel.putter_id); + self.owned_ports.push(channel.getter_id); + + return channel; + } + + /// Creates a new connector. Note that it is not scheduled immediately, but + /// depends on the `ApplicationConnector` to run, followed by the created + /// connector being scheduled. + // TODO: Optimize by yanking out scheduler logic for common use. + pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> { + // Retrieve ports and make sure that we own the ones that are currently + // specified. This is also checked by the scheduler, but that is done + // asynchronously. + let mut initial_ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut initial_ports); + for port_to_remove in &initial_ports { + match self.owned_ports.iter().position(|v| v == port_to_remove) { + Some(index_to_remove) => { + // We own the port, so continue + self.owned_ports.remove(index_to_remove) + }, + None => { + // We don't own the port + return Err(ComponentCreationError::UnownedPort); + } + } + } + + let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; + let connector = ConnectorPDL::new(0, Branch::new_initial_branch(state), initial_ports); + + // Put on job queue + { + let mut queue = self.job_queue.lock().unwrap(); + queue.push(ApplicationJob::NewConnector(connector)); + } + + // Send ping message to wake up connector + let connector = self.runtime.global_store.connectors.get_shared(self.connector_id); + connector.inbox.insert_message(Message::Ping); + let should_wake_up = connector.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; + self.runtime.global_store.connector_queue.push_back(key); + } + + return Ok(()); + } + + /// Check if the next sync-round is finished. + pub fn try_wait(&self) -> bool { + let (is_done, _) = &*self.sync_done; + let lock = is_done.lock().unwrap(); + return *lock; + } + + /// Wait until the next sync-round is finished + pub fn wait(&self) { + let (is_done, condition) = &*self.sync_done; + let lock = is_done.lock().unwrap(); + condition.wait_while(lock, |v| !*v); // wait while not done + } + + /// Called by runtime to set associated connector's ID. + pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { + self.connector_id = id; + } +} \ No newline at end of file diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index f58c6196cffb794ad70cf508abcd8c0b099de85c..43e9d40a5d743309ee20a44059ab0340b91f4ddd 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -50,6 +50,6 @@ pub struct Port { // TODO: Turn port ID into its own type pub struct Channel { - pub putter_id: u32, // can put on it, so from the connector's point of view, this is an output - pub getter_id: u32, // vice versa: can get on it, so an input for the connector + pub putter_id: PortIdLocal, // can put on it, so from the connector's point of view, this is an output + pub getter_id: PortIdLocal, // vice versa: can get on it, so an input for the connector } \ No newline at end of file diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index dd5288e2399b338563d4c1130aa3fbe5f9c05ee5..00a1ba5f112aba960e8d99d6dd7015ce6977ed26 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -5,23 +5,21 @@ use std::time::Duration; use std::thread; use crate::ProtocolDescription; +use crate::runtime2::global_store::ConnectorVariant; +use super::RuntimeInner; use super::port::{PortIdLocal}; use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant}; -use super::connector::{Connector, ConnectorPublic, ConnectorScheduling, RunDeltaState}; +use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; use super::global_store::{ConnectorKey, ConnectorId, GlobalStore}; pub(crate) struct Scheduler { - global: Arc, - code: Arc, + runtime: Arc, } impl Scheduler { - pub fn new(global: Arc, code: Arc) -> Self { - Self{ - global, - code, - } + pub fn new(runtime: Arc) -> Self { + return Self{ runtime }; } pub fn run(&mut self) { @@ -31,12 +29,12 @@ impl Scheduler { 'thread_loop: loop { // Retrieve a unit of work - let connector_key = self.global.connector_queue.pop_front(); + let connector_key = self.runtime.global_store.connector_queue.pop_front(); if connector_key.is_none() { // TODO: @Performance, needs condition or something, and most // def' not sleeping thread::sleep(Duration::new(1, 0)); - if self.global.should_exit.load(Ordering::Acquire) { + if self.runtime.global_store.should_exit.load(Ordering::Acquire) { // Thread exits! break 'thread_loop; } @@ -46,7 +44,7 @@ impl Scheduler { // We have something to do let connector_key = connector_key.unwrap(); - let scheduled = self.global.connectors.get_mut(&connector_key); + let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key); // Keep running until we should no longer immediately schedule the // connector. @@ -68,7 +66,7 @@ impl Scheduler { match message.content { ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { // Need to change port target - let port = self.global.ports.get(&connector_key, port_id); + let port = self.runtime.global_store.ports.get(&connector_key, port_id); port.peer_connector = new_target_connector_id; debug_assert!(delta_state.outbox.is_empty()); @@ -86,22 +84,31 @@ impl Scheduler { scheduled.router.handle_ack(message.id); } } - } + }, + Message::Ping => {}, } } // Actually run the connector + // TODO: Revise let new_schedule; - if scheduled.connector.is_in_sync_mode() { - // In synchronous mode, so we can expect messages being sent, - // but we never expect the creation of connectors - new_schedule = scheduled.connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); - debug_assert!(delta_state.new_connectors.is_empty()); - } else { - // In regular running mode (not in a sync block) we cannot send - // messages but we can create new connectors - new_schedule = scheduled.connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state); - debug_assert!(delta_state.outbox.is_empty()); + match &mut scheduled.connector { + ConnectorVariant::UserDefined(connector) => { + if connector.is_in_sync_mode() { + // In synchronous mode, so we can expect messages being sent, + // but we never expect the creation of connectors + new_schedule = connector.run_in_speculative_mode(&self.runtime.protocol_description, &mut delta_state); + debug_assert!(delta_state.new_connectors.is_empty()); + } else { + // In regular running mode (not in a sync block) we cannot send + // messages but we can create new connectors + new_schedule = connector.run_in_deterministic_mode(&self.runtime.protocol_description, &mut delta_state); + debug_assert!(delta_state.outbox.is_empty()); + } + }, + ConnectorVariant::Native(connector) => { + new_schedule = connector.run(&self.runtime.protocol_description); + }, } // Handle all of the output from the current run: messages to @@ -118,7 +125,7 @@ impl Scheduler { ConnectorScheduling::Immediate => unreachable!(), ConnectorScheduling::Later => { // Simply queue it again later - self.global.connector_queue.push_back(connector_key); + self.runtime.global_store.connector_queue.push_back(connector_key); }, ConnectorScheduling::NotNow => { // Need to sleep, note that we are the only ones which are @@ -136,7 +143,7 @@ impl Scheduler { .is_ok(); if should_reschedule_self { - self.global.connector_queue.push_back(connector_key); + self.runtime.global_store.connector_queue.push_back(connector_key); } } } @@ -149,7 +156,7 @@ impl Scheduler { if !delta_state.outbox.is_empty() { for message in delta_state.outbox.drain(..) { let (inbox_message, target_connector_id) = { - let sending_port = self.global.ports.get(&connector_key, message.sending_port); + let sending_port = self.runtime.global_store.ports.get(&connector_key, message.sending_port); ( DataMessage { sending_connector: connector_key.downcast(), @@ -170,12 +177,12 @@ impl Scheduler { // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { - let cur_connector = self.global.connectors.get_mut(connector_key); + let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key); for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key - let new_key = self.global.connectors.create(new_connector); - let new_connector = self.global.connectors.get_mut(&new_key); + let new_key = self.runtime.global_store.connectors.create(ConnectorVariant::UserDefined(new_connector)); + let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); // Each port should be lost by the connector that created the // new one. Note that the creator is the current owner. @@ -184,7 +191,7 @@ impl Scheduler { // Modify ownership, retrieve peer connector let (peer_connector_id, peer_port_id) = { - let mut port = self.global.ports.get(connector_key, *port_id); + let mut port = self.runtime.global_store.ports.get(connector_key, *port_id); port.owning_connector = new_key.downcast(); (port.peer_connector, port.peer_id) @@ -199,13 +206,13 @@ impl Scheduler { } // Schedule new connector to run - self.global.connector_queue.push_back(new_key); + self.runtime.global_store.connector_queue.push_back(new_key); } } } pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { - let connector = self.global.connectors.get_shared(connector_id); + let connector = self.runtime.global_store.connectors.get_shared(connector_id); connector.inbox.insert_message(message); let should_wake_up = connector.sleeping @@ -214,7 +221,7 @@ impl Scheduler { if should_wake_up { let key = unsafe { ConnectorKey::from_id(connector_id) }; - self.global.connector_queue.push_back(key); + self.runtime.global_store.connector_queue.push_back(key); } } }