Changeset - 58dfabd1be9f
[Not reviewed]
0 7 1
MH - 4 years ago 2021-10-14 00:23:18
contact@maxhenger.nl
moving to laptop
8 files changed with 363 insertions and 106 deletions:
0 comments (0 inline, 0 general)
src/protocol/mod.rs
Show inline comments
 
@@ -51,6 +51,7 @@ pub enum ComponentCreationError {
 
    DefinitionNotComponent,
 
    InvalidNumArguments,
 
    InvalidArgumentType(usize),
 
    UnownedPort,
 
}
 

	
 
impl std::fmt::Debug for ProtocolDescription {
src/runtime2/connector.rs
Show inline comments
 
@@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool;
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage};
 
use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage, DataMessage, SyncMessage};
 
use crate::runtime2::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
@@ -52,7 +52,7 @@ pub(crate) struct Branch {
 
    sync_state: SpeculativeState,
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    inbox: HashMap<PortIdLocal, OutgoingMessage>, // TODO: @temporary, remove together with fires()
 
    received: HashMap<PortIdLocal, DataMessage>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
}
 

	
 
@@ -66,7 +66,7 @@ impl Branch {
 
            code_state: component_state,
 
            sync_state: SpeculativeState::RunningNonSync,
 
            next_branch_in_queue: None,
 
            inbox: HashMap::new(),
 
            received: HashMap::new(),
 
            ports_delta: Vec::new(),
 
        }
 
    }
 
@@ -85,7 +85,7 @@ impl Branch {
 
            code_state: parent_branch.code_state.clone(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            next_branch_in_queue: None,
 
            inbox: parent_branch.inbox.clone(),
 
            received: parent_branch.received.clone(),
 
            ports_delta: parent_branch.ports_delta.clone(),
 
        }
 
    }
 
@@ -264,7 +264,7 @@ impl ConnectorPublic {
 

	
 
// TODO: Maybe prevent false sharing by aligning `public` to next cache line.
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct Connector {
 
pub(crate) struct ConnectorPDL {
 
    // State and properties of connector itself
 
    id: u32,
 
    in_sync: bool,
 
@@ -273,6 +273,7 @@ pub(crate) struct Connector {
 
    sync_active: BranchQueue,
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    sync_finished_last_handled: u32,
 
    // Port/message management
 
    pub inbox: PrivateInbox,
 
    pub ports: ConnectorPorts,
 
@@ -297,7 +298,7 @@ impl RunContext for TempCtx {
 
    }
 
}
 

	
 
impl Connector {
 
impl ConnectorPDL {
 
    /// Constructs a representation of a connector. The assumption is that the
 
    /// initial branch is at the first instruction of the connector's code,
 
    /// hence is in a non-sync state.
 
@@ -309,6 +310,7 @@ impl Connector {
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            sync_finished_last_handled: 0, // none at all
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
        }
 
@@ -318,6 +320,34 @@ impl Connector {
 
        return self.in_sync;
 
    }
 

	
 
    pub fn insert_sync_message(&mut self, message: SyncMessage, results: &mut RunDeltaState) {
 

	
 
    }
 

	
 
    pub fn run(&mut self, pd: &ProtocolDescription, results: &mut RunDeltaState) -> ConnectorScheduling {
 
        if self.in_sync {
 
            let scheduling = self.run_in_speculative_mode(pd, results);
 

	
 
            // When in speculative mode we might have generated new sync
 
            // solutions, we need to turn them into proposed solutions here.
 
            if self.sync_finished_last_handled != self.sync_finished.last {
 
                let mut next_id;
 
                if self.sync_finished_last_handled == 0 {
 
                    next_id = self.sync_finished.first;
 
                } else {
 
                    let last_handled = &self.branches[self.sync_finished_last_handled as usize];
 
                    debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue"
 
                    next_id = last_handled.next_branch_in_queue.unwrap();
 
                }
 

	
 
                // Transform branch into proposed global solution
 
            }
 
        } else {
 
            let scheduling = self.run_in_deterministic_mode(pd, results);
 
            return scheduling;
 
        }
 
    }
 

	
 
    /// Runs the connector in synchronous mode. Potential changes to the global
 
    /// system's state are added to the `RunDeltaState` object by the connector,
 
    /// where it is the caller's responsibility to immediately take care of
 
@@ -400,11 +430,12 @@ impl Connector {
 
                    // But if some messages can be immediately applied, do so
 
                    // now.
 
                    let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id);
 
                    if !messages.is_empty() {
 
                        // TODO: If message contains ports, transfer ownership of port.
 
                    let mut did_have_messages = false;
 

	
 
                    for message in messages {
 
                            // For each message, for the execution and feed it
 
                            // the provided message
 
                        did_have_messages = true;
 

	
 
                        // For each message prepare a new branch to execute
 
                        let new_branch_index = self.branches.len() as u32;
 
                        let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch);
 
                        self.ports.prepare_sync_branch(branch.index.index, new_branch_index);
 
@@ -413,7 +444,14 @@ impl Connector {
 
                        port_mapping.last_registered_branch_id = message.sender_cur_branch_id;
 
                        debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1);
 

	
 
                            new_branch.inbox.insert(local_port_id, message.clone());
 
                        new_branch.received.insert(local_port_id, message.clone());
 

	
 
                        // If the message contains any ports then they will now
 
                        // be owned by the new branch
 
                        debug_assert!(results.ports.is_empty());
 
                        find_ports_in_value_group(&message.message, &mut results.ports);
 
                        Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports);
 
                        results.ports.clear();
 

	
 
                        // Schedule the new branch
 
                        debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync);
 
@@ -421,8 +459,9 @@ impl Connector {
 
                        self.branches.push(new_branch);
 
                    }
 

	
 
                        // Because we have new branches to run, schedule
 
                        // immediately
 
                    if did_have_messages {
 
                        // If we did create any new branches, then we can run
 
                        // them immediately.
 
                        return ConnectorScheduling::Immediate;
 
                    }
 
                } else {
 
@@ -431,7 +470,7 @@ impl Connector {
 
            },
 
            RunResult::BranchAtSyncEnd => {
 
                // Branch is done, go through all of the ports that are not yet
 
                // assigned and modify them to be
 
                // assigned and map them to non-firing.
 
                for port_idx in 0..self.ports.num_ports() {
 
                    let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx);
 
                    if !port_mapping.is_assigned {
 
@@ -480,6 +519,13 @@ impl Connector {
 
                        message: value_group,
 
                    };
 

	
 
                    // If the message contains any ports then we release our
 
                    // ownership over them in this branch
 
                    debug_assert!(results.ports.is_empty());
 
                    find_ports_in_value_group(&message.message, &mut results.ports);
 
                    Self::release_ports_during_sync(&mut self.ports, branch, &results.ports);
 
                    results.ports.clear();
 

	
 
                    results.outbox.push(message);
 
                    return ConnectorScheduling::Immediate
 
                } else {
 
@@ -545,7 +591,7 @@ impl Connector {
 
                };
 
                let new_connector_ports = results.ports.clone(); // TODO: Do something with this
 
                let new_connector_branch = Branch::new_initial_branch(new_connector_state);
 
                let new_connector = Connector::new(0, new_connector_branch, new_connector_ports);
 
                let new_connector = ConnectorPDL::new(0, new_connector_branch, new_connector_ports);
 

	
 
                results.new_connectors.push(new_connector);
 

	
 
@@ -586,7 +632,9 @@ impl Connector {
 
    }
 

	
 
    #[inline]
 
    fn push_branch_into_queue(branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_push: BranchId) {
 
    fn push_branch_into_queue(
 
        branches: &mut Vec<Branch>, queue: &mut BranchQueue, to_push: BranchId,
 
    ) {
 
        debug_assert!(to_push.is_valid());
 
        let to_push = to_push.index;
 

	
 
@@ -702,6 +750,11 @@ impl Connector {
 

	
 
        return Ok(())
 
    }
 

	
 
    // Helpers for generating and handling sync messages (and the solutions that
 
    // are described by those sync messages)
 

	
 
    fn generate_initial_solution_for_branch(&self, branch_id: BranchId,)
 
}
 

	
 
/// A data structure passed to a connector whose code is being executed that is
 
@@ -712,7 +765,7 @@ pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<OutgoingMessage>,
 
    pub new_connectors: Vec<Connector>,
 
    pub new_connectors: Vec<ConnectorPDL>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
}
src/runtime2/global_store.rs
Show inline comments
 
use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{Connector, ConnectorPublic};
 
use super::connector::{ConnectorPDL, ConnectorPublic};
 
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 
use super::inbox::PublicInbox;
 
use super::scheduler::Router;
 
@@ -8,6 +8,7 @@ use super::scheduler::Router;
 
use std::ptr;
 
use std::sync::{Barrier, RwLock, RwLockReadGuard};
 
use std::sync::atomic::AtomicBool;
 
use crate::runtime2::native::Connector;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
@@ -45,8 +46,15 @@ impl ConnectorId {
 
    }
 
}
 

	
 
// TODO: Change this, I hate this. But I also don't want to put `public` and
 
//  `router` of `ScheduledConnector` back into `Connector`.
 
pub enum ConnectorVariant {
 
    UserDefined(ConnectorPDL),
 
    Native(Box<dyn Connector>),
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: Connector,
 
    pub connector: ConnectorVariant,
 
    pub public: ConnectorPublic,
 
    pub router: Router
 
}
 
@@ -100,7 +108,7 @@ impl ConnectorStore {
 

	
 
    /// Create a new connector, returning the key that can be used to retrieve
 
    /// and/or queue it.
 
    pub(crate) fn create(&self, connector: Connector) -> ConnectorKey {
 
    pub(crate) fn create(&self, connector: ConnectorVariant) -> ConnectorKey {
 
        let lock = self.inner.write().unwrap();
 
        let connector = ScheduledConnector{
 
            connector,
 
@@ -189,17 +197,12 @@ impl PortStore {
 
        }
 
    }
 

	
 
    pub(crate) fn create_channel(&self, creating_connector: Option<ConnectorId>) -> Channel {
 
    pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> Channel {
 
        let mut lock = self.inner.write().unwrap();
 

	
 
        // Reserves a new port. Doesn't point it to its counterpart
 
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option<ConnectorId>) -> u32 {
 
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: ConnectorId) -> u32 {
 
            let index;
 
            let (ownership, connector_id) = if creating_connector.is_some() {
 
                (PortOwnership::Owned, creating_connector.unwrap())
 
            } else {
 
                (PortOwnership::Unowned, ConnectorId::new_invalid())
 
            };
 

	
 
            if lock.free.is_empty() {
 
                index = lock.ports.len() as u32;
 
@@ -207,7 +210,7 @@ impl PortStore {
 
                    self_id: PortIdLocal::new(index),
 
                    peer_id: PortIdLocal::new_invalid(),
 
                    kind,
 
                    ownership,
 
                    ownership: PortOwnership::Owned,
 
                    owning_connector: connector_id,
 
                    peer_connector: connector_id
 
                });
 
@@ -217,7 +220,7 @@ impl PortStore {
 

	
 
                port.peer_id = PortIdLocal::new_invalid();
 
                port.kind = kind;
 
                port.ownership = ownership;
 
                port.ownership = PortOwnership::Owned;
 
                port.owning_connector = connector_id;
 
                port.peer_connector = connector_id;
 
            }
 
@@ -238,7 +241,10 @@ impl PortStore {
 
            getter_port.peer_id = putter_port.self_id;
 
        }
 

	
 
        return Channel{ putter_id, getter_id }
 
        return Channel{
 
            putter_id: PortIdLocal::new(putter_id),
 
            getter_id: PortIdLocal::new(getter_id),
 
        }
 
    }
 
}
 

	
src/runtime2/inbox.rs
Show inline comments
 
@@ -42,6 +42,29 @@ pub struct DataMessage {
 
    pub message: ValueGroup,
 
}
 

	
 
pub enum SyncBranchConstraint {
 
    SilentPort(PortIdLocal),
 
    BranchNumber(u32),
 
    PortMapping(PortIdLocal, u32),
 
}
 

	
 
pub struct SyncConnectorSolution {
 
    connector_id: ConnectorId,
 
    terminating_branch_id: BranchId,
 
    execution_branch_ids: Vec<BranchId>, // ends with terminating branch ID
 
}
 

	
 
pub struct SyncConnectorConstraints {
 
    connector_id: ConnectorId,
 
    constraints: Vec<SyncBranchConstraint>,
 
}
 

	
 
pub struct SyncMessage {
 
    connector_solutions: Vec<SyncConnectorSolution>,
 
    connector_constraints: Vec<SyncConnectorConstraints>,
 
    connectors_to_visit: Vec<u32>,
 
}
 

	
 
/// A control message. These might be sent by the scheduler to notify eachother
 
/// of asynchronous state changes.
 
pub struct ControlMessage {
 
@@ -59,8 +82,10 @@ pub enum ControlMessageVariant {
 
/// out and handles all control message and potential routing). The correctly
 
/// addressed `Data` variants will end up at the connector.
 
pub enum Message {
 
    Data(DataMessage),
 
    Control(ControlMessage),
 
    Data(DataMessage),          // data message, handled by connector
 
    Sync(SyncMessage),          // sync message, handled by both connector/scheduler
 
    Control(ControlMessage),    // control message, handled by scheduler
 
    Ping,                       // ping message, intentionally waking up a connector (used for native connectors)
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
src/runtime2/mod.rs
Show inline comments
 
@@ -3,6 +3,7 @@
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod native;
 
mod port;
 
mod global_store;
 
mod scheduler;
 
@@ -12,35 +13,50 @@ mod inbox;
 

	
 
// Imports
 

	
 
use std::sync::Arc;
 
use std::sync::{Arc, Mutex};
 
use std::sync::atomic::Ordering;
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::protocol::eval::*;
 
use crate::{common::Id, PortId, ProtocolDescription};
 

	
 
use global_store::GlobalStore;
 
use global_store::{ConnectorVariant, GlobalStore};
 
use scheduler::Scheduler;
 
use crate::protocol::ComponentCreationError;
 
use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group};
 
use connector::{Branch, ConnectorPDL, find_ports_in_value_group};
 
use native::{ConnectorApplication, ApplicationInterface};
 

	
 

	
 
// Runtime API
 
// TODO: Exit condition is very dirty. Take into account:
 
//  - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working
 
//  - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done
 
//  - User-owned interfaces: As long as these are owned user may still decide to create new connectors.
 
pub struct Runtime {
 
    global_store: Arc<GlobalStore>,
 
    protocol_description: Arc<ProtocolDescription>,
 
    schedulers: Vec<JoinHandle<()>>
 
    inner: Arc<RuntimeInner>,
 
}
 

	
 
pub(crate) struct RuntimeInner {
 
    pub(crate) global_store: GlobalStore,
 
    pub(crate) protocol_description: ProtocolDescription,
 
    schedulers: Mutex<Vec<JoinHandle<()>>>, // TODO: Revise, make exit condition something like: all interfaces dropped
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: usize, protocol_description: Arc<ProtocolDescription>) -> Runtime {
 
    pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let global_store = Arc::new(GlobalStore::new());
 
        let runtime_inner = Arc::new(RuntimeInner{
 
            global_store: GlobalStore::new(),
 
            protocol_description,
 
            schedulers: Mutex::new(Vec::new()),
 
        });
 

	
 
        // Launch threads
 
        {
 
            let mut schedulers = Vec::with_capacity(num_threads);
 
            for _ in 0..num_threads {
 
            let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone());
 
                let mut scheduler = Scheduler::new(runtime_inner.clone());
 
                let thread = thread::spawn(move || {
 
                    scheduler.run();
 
                });
 
@@ -48,51 +64,35 @@ impl Runtime {
 
                schedulers.push(thread);
 
            }
 

	
 
        // Move innards into runtime struct
 
        return Runtime{
 
            global_store,
 
            protocol_description,
 
            schedulers,
 
        }
 
            let mut lock = runtime_inner.schedulers.lock().unwrap();
 
            *lock = schedulers;
 
        }
 

	
 
    /// Returns (putter port, getter port)
 
    pub fn create_channel(&self) -> (Value, Value) {
 
        let channel = self.global_store.ports.create_channel(None);
 
        let putter_value = Value::Output(PortId(Id{
 
            connector_id: u32::MAX,
 
            u32_suffix: channel.putter_id,
 
        }));
 
        let getter_value = Value::Input(PortId(Id{
 
            connector_id: u32::MAX,
 
            u32_suffix: channel.getter_id,
 
        }));
 
        return (putter_value, getter_value);
 
        // Return runtime
 
        return Runtime{ inner: runtime_inner };
 
    }
 

	
 
    pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // TODO: Remove component creation function from PD, should not be concerned with it
 
        // Create the connector and mark the ports as now owned by the
 
        // connector
 
        let mut port_ids = Vec::new();
 
        find_ports_in_value_group(&values, &mut port_ids);
 

	
 
        let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?;
 
        let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone());
 
        let connector_key = self.global_store.connectors.create(connector);
 

	
 
        for port_id in port_ids {
 
            let port = self.global_store.ports.get(&connector_key, port_id);
 
            port.owning_connector = connector_key.downcast();
 
            port.peer_connector
 
            // TODO: Note that we immediately need to notify the other side of the connector that
 
            //  the port has moved!
 
        }
 
    /// Returns a new interface through which channels and connectors can be
 
    /// created.
 
    pub fn create_interface(&self) -> ApplicationInterface {
 
        let (connector, mut interface) = ConnectorApplication::new(self.inner.clone());
 
        let connector = Box::new(connector);
 

	
 
        let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector));
 
        interface.set_connector_id(connector_key.downcast());
 

	
 
        // Note that we're not scheduling. That is done by the interface in case
 
        // it is actually needed.
 
        return interface;
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 

	
 
        self.inner.global_store.should_exit.store(true, Ordering::Release);
 
        let mut schedulers = self.inner.schedulers.lock().unwrap();
 
        for scheduler in schedulers.drain(..) {
 
            scheduler.join();
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/native.rs
Show inline comments
 
new file 100644
 
use std::sync::{Arc, Mutex, Condvar};
 
use std::cell::Cell;
 
use std::sync::atomic::Ordering;
 
use crate::protocol::ComponentCreationError;
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::ProtocolDescription;
 
use crate::runtime2::connector::{Branch, find_ports_in_value_group};
 
use crate::runtime2::global_store::ConnectorKey;
 

	
 
use super::RuntimeInner;
 
use super::global_store::{ConnectorVariant, ConnectorId};
 
use super::port::{Channel, PortIdLocal};
 
use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState};
 
use super::inbox::{Message, DataMessage, SyncMessage};
 

	
 
pub trait Connector {
 
    fn insert_data_message(&mut self, message: DataMessage);
 
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState);
 
    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling;
 
}
 

	
 
type SyncDone = Arc<(Mutex<bool>, Condvar)>;
 
type JobQueue = Arc<Mutex<Vec<ApplicationJob>>>,
 

	
 
enum ApplicationJob {
 
    NewConnector(ConnectorPDL),
 
}
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
pub struct ConnectorApplication {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(false), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn insert_sync_message(&mut self, message: SyncMessage, delta_state: &mut RunDeltaState) {
 
        todo!("handling sync messages in ApplicationConnector");
 
    }
 

	
 
    fn insert_data_message(&mut self, message: DataMessage)  {
 
        todo!("handling messages in ApplicationConnector");
 
    }
 

	
 
    fn run(&mut self, protocol_description: &ProtocolDescription, delta_state: &mut RunDeltaState) -> ConnectorScheduling {
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop() {
 
            match job {
 
                ApplicationJob::NewConnector(connector) => {
 
                    delta_state.new_connectors.push(connector);
 
                }
 
            }
 
        }
 

	
 
        return ConnectorScheduling::NotNow;
 
    }
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<PortIdLocal>,
 
}
 

	
 
impl ApplicationInterface {
 
    pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner1>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel.
 
    pub fn create_channel(&mut self) -> Channel {
 
        let channel = self.runtime.global_store.ports.create_channel(self.connector_id);
 
        self.owned_ports.push(channel.putter_id);
 
        self.owned_ports.push(channel.getter_id);
 

	
 
        return channel;
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Optimize by yanking out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for port_to_remove in &initial_ports {
 
            match self.owned_ports.iter().position(|v| v == port_to_remove) {
 
                Some(index_to_remove) => {
 
                    // We own the port, so continue
 
                    self.owned_ports.remove(index_to_remove)
 
                },
 
                None => {
 
                    // We don't own the port
 
                    return Err(ComponentCreationError::UnownedPort);
 
                }
 
            }
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(0, Branch::new_initial_branch(state), initial_ports);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push(ApplicationJob::NewConnector(connector));
 
        }
 

	
 
        // Send ping message to wake up connector
 
        let connector = self.runtime.global_store.connectors.get_shared(self.connector_id);
 
        connector.inbox.insert_message(Message::Ping);
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.global_store.connector_queue.push_back(key);
 
        }
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Check if the next sync-round is finished.
 
    pub fn try_wait(&self) -> bool {
 
        let (is_done, _) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        return *lock;
 
    }
 

	
 
    /// Wait until the next sync-round is finished
 
    pub fn wait(&self) {
 
        let (is_done, condition) = &*self.sync_done;
 
        let lock = is_done.lock().unwrap();
 
        condition.wait_while(lock, |v| !*v); // wait while not done
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/port.rs
Show inline comments
 
@@ -50,6 +50,6 @@ pub struct Port {
 

	
 
// TODO: Turn port ID into its own type
 
pub struct Channel {
 
    pub putter_id: u32, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: u32, // vice versa: can get on it, so an input for the connector
 
    pub putter_id: PortIdLocal, // can put on it, so from the connector's point of view, this is an output
 
    pub getter_id: PortIdLocal, // vice versa: can get on it, so an input for the connector
 
}
 
\ No newline at end of file
src/runtime2/scheduler.rs
Show inline comments
 
@@ -5,23 +5,21 @@ use std::time::Duration;
 
use std::thread;
 

	
 
use crate::ProtocolDescription;
 
use crate::runtime2::global_store::ConnectorVariant;
 

	
 
use super::RuntimeInner;
 
use super::port::{PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{Connector, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 

	
 
pub(crate) struct Scheduler {
 
    global: Arc<GlobalStore>,
 
    code: Arc<ProtocolDescription>,
 
    runtime: Arc<RuntimeInner>,
 
}
 

	
 
impl Scheduler {
 
    pub fn new(global: Arc<GlobalStore>, code: Arc<ProtocolDescription>) -> Self {
 
        Self{
 
            global,
 
            code,
 
        }
 
    pub fn new(runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{ runtime };
 
    }
 

	
 
    pub fn run(&mut self) {
 
@@ -31,12 +29,12 @@ impl Scheduler {
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            let connector_key = self.global.connector_queue.pop_front();
 
            let connector_key = self.runtime.global_store.connector_queue.pop_front();
 
            if connector_key.is_none() {
 
                // TODO: @Performance, needs condition or something, and most
 
                //  def' not sleeping
 
                thread::sleep(Duration::new(1, 0));
 
                if self.global.should_exit.load(Ordering::Acquire) {
 
                if self.runtime.global_store.should_exit.load(Ordering::Acquire) {
 
                    // Thread exits!
 
                    break 'thread_loop;
 
                }
 
@@ -46,7 +44,7 @@ impl Scheduler {
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let scheduled = self.global.connectors.get_mut(&connector_key);
 
            let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
@@ -68,7 +66,7 @@ impl Scheduler {
 
                            match message.content {
 
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                    // Need to change port target
 
                                    let port = self.global.ports.get(&connector_key, port_id);
 
                                    let port = self.runtime.global_store.ports.get(&connector_key, port_id);
 
                                    port.peer_connector = new_target_connector_id;
 
                                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
@@ -86,23 +84,32 @@ impl Scheduler {
 
                                    scheduled.router.handle_ack(message.id);
 
                                }
 
                            }
 
                        }
 
                        },
 
                        Message::Ping => {},
 
                    }
 
                }
 

	
 
                // Actually run the connector
 
                // TODO: Revise
 
                let new_schedule;
 
                if scheduled.connector.is_in_sync_mode() {
 
                match &mut scheduled.connector {
 
                    ConnectorVariant::UserDefined(connector) => {
 
                        if connector.is_in_sync_mode() {
 
                            // In synchronous mode, so we can expect messages being sent,
 
                            // but we never expect the creation of connectors
 
                    new_schedule = scheduled.connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
 
                            new_schedule = connector.run_in_speculative_mode(&self.runtime.protocol_description, &mut delta_state);
 
                            debug_assert!(delta_state.new_connectors.is_empty());
 
                        } else {
 
                            // In regular running mode (not in a sync block) we cannot send
 
                            // messages but we can create new connectors
 
                    new_schedule = scheduled.connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
 
                            new_schedule = connector.run_in_deterministic_mode(&self.runtime.protocol_description, &mut delta_state);
 
                            debug_assert!(delta_state.outbox.is_empty());
 
                        }
 
                    },
 
                    ConnectorVariant::Native(connector) => {
 
                        new_schedule = connector.run(&self.runtime.protocol_description);
 
                    },
 
                }
 

	
 
                // Handle all of the output from the current run: messages to
 
                // send and connectors to instantiate.
 
@@ -118,7 +125,7 @@ impl Scheduler {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.global.connector_queue.push_back(connector_key);
 
                    self.runtime.global_store.connector_queue.push_back(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
@@ -136,7 +143,7 @@ impl Scheduler {
 
                            .is_ok();
 

	
 
                        if should_reschedule_self {
 
                            self.global.connector_queue.push_back(connector_key);
 
                            self.runtime.global_store.connector_queue.push_back(connector_key);
 
                        }
 
                    }
 
                }
 
@@ -149,7 +156,7 @@ impl Scheduler {
 
        if !delta_state.outbox.is_empty() {
 
            for message in delta_state.outbox.drain(..) {
 
                let (inbox_message, target_connector_id) = {
 
                    let sending_port = self.global.ports.get(&connector_key, message.sending_port);
 
                    let sending_port = self.runtime.global_store.ports.get(&connector_key, message.sending_port);
 
                    (
 
                        DataMessage {
 
                            sending_connector: connector_key.downcast(),
 
@@ -170,12 +177,12 @@ impl Scheduler {
 
        // Handling any new connectors that were scheduled
 
        // TODO: Pool outgoing messages to reduce atomic access
 
        if !delta_state.new_connectors.is_empty() {
 
            let cur_connector = self.global.connectors.get_mut(connector_key);
 
            let cur_connector = self.runtime.global_store.connectors.get_mut(connector_key);
 

	
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.global.connectors.create(new_connector);
 
                let new_connector = self.global.connectors.get_mut(&new_key);
 
                let new_key = self.runtime.global_store.connectors.create(ConnectorVariant::UserDefined(new_connector));
 
                let new_connector = self.runtime.global_store.connectors.get_mut(&new_key);
 

	
 
                // Each port should be lost by the connector that created the
 
                // new one. Note that the creator is the current owner.
 
@@ -184,7 +191,7 @@ impl Scheduler {
 

	
 
                    // Modify ownership, retrieve peer connector
 
                    let (peer_connector_id, peer_port_id) = {
 
                        let mut port = self.global.ports.get(connector_key, *port_id);
 
                        let mut port = self.runtime.global_store.ports.get(connector_key, *port_id);
 
                        port.owning_connector = new_key.downcast();
 

	
 
                        (port.peer_connector, port.peer_id)
 
@@ -199,13 +206,13 @@ impl Scheduler {
 
                }
 

	
 
                // Schedule new connector to run
 
                self.global.connector_queue.push_back(new_key);
 
                self.runtime.global_store.connector_queue.push_back(new_key);
 
            }
 
        }
 
    }
 

	
 
    pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
        let connector = self.global.connectors.get_shared(connector_id);
 
        let connector = self.runtime.global_store.connectors.get_shared(connector_id);
 

	
 
        connector.inbox.insert_message(message);
 
        let should_wake_up = connector.sleeping
 
@@ -214,7 +221,7 @@ impl Scheduler {
 

	
 
        if should_wake_up {
 
            let key = unsafe { ConnectorKey::from_id(connector_id) };
 
            self.global.connector_queue.push_back(key);
 
            self.runtime.global_store.connector_queue.push_back(key);
 
        }
 
    }
 
}
0 comments (0 inline, 0 general)