Changeset - daf15df0f8ca
[Not reviewed]
0 9 0
MH - 4 years ago 2021-10-11 12:24:44
contact@maxhenger.nl
scaffolding in place for scheduler/runtime
9 files changed with 656 insertions and 163 deletions:
0 comments (0 inline, 0 general)
src/protocol/mod.rs
Show inline comments
 
@@ -44,6 +44,15 @@ pub(crate) enum EvalContext<'a> {
 
}
 
//////////////////////////////////////////////
 

	
 
#[derive(Debug)]
 
pub enum ComponentCreationError {
 
    ModuleDoesntExist,
 
    DefinitionDoesntExist,
 
    DefinitionNotComponent,
 
    InvalidNumArguments,
 
    InvalidArgumentType(usize),
 
}
 

	
 
impl std::fmt::Debug for ProtocolDescription {
 
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 
        write!(f, "(An opaque protocol description)")
 
@@ -79,6 +88,8 @@ impl ProtocolDescription {
 
            pool: Mutex::new(parser.string_pool),
 
        });
 
    }
 

	
 
    #[deprecated]
 
    pub(crate) fn component_polarities(
 
        &self,
 
        module_name: &[u8],
 
@@ -130,7 +141,9 @@ impl ProtocolDescription {
 
        }
 
        Ok(result)
 
    }
 

	
 
    // expects port polarities to be correct
 
    #[deprecated]
 
    pub(crate) fn new_component(&self, module_name: &[u8], identifier: &[u8], ports: &[PortId]) -> ComponentState {
 
        let mut args = Vec::new();
 
        for (&x, y) in ports.iter().zip(self.component_polarities(module_name, identifier).unwrap()) {
 
@@ -147,6 +160,59 @@ impl ProtocolDescription {
 
        ComponentState { prompt: Prompt::new(&self.types, &self.heap, def, 0, ValueGroup::new_stack(args)) }
 
    }
 

	
 
    // TODO: Ofcourse, rename this at some point, perhaps even remove it in its
 
    //  entirety. Find some way to interface with the parameter's types.
 
    pub(crate) fn new_component_v2(
 
        &self, module_name: &[u8], identifier: &[u8], arguments: ValueGroup
 
    ) -> Result<ComponentState, ComponentCreationError> {
 
        // Find the module in which the definition can be found
 
        let module_root = self.lookup_module_root(module_name);
 
        if module_root.is_none() {
 
            return Err(ComponentCreationError::ModuleDoesntExist);
 
        }
 
        let module_root = module_root.unwrap();
 

	
 
        let root = &self.heap[module_root];
 
        let definition_id = root.get_definition_ident(&heap, identifier);
 
        if definition_id.is_none() {
 
            return Err(ComponentCreationError::DefinitionDoesntExist);
 
        }
 
        let definition_id = definition_id.unwrap();
 

	
 
        let definition = &self.heap[definition_id];
 
        if !definition.is_component() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // Make sure that the types of the provided value group matches that of
 
        // the expected types.
 
        let definition = definition.as_component();
 
        if !definition.poly_vars.is_empty() {
 
            return Err(ComponentCreationError::DefinitionNotComponent);
 
        }
 

	
 
        // - check number of arguments
 
        let expr_data = self.types.get_procedure_expression_data(&definition_id, 0);
 
        if expr_data.arg_types.len() != arguments.values.len() {
 
            return Err(ComponentCreationError::InvalidNumArguments);
 
        }
 

	
 
        // - for each argument try to make sure the types match
 
        for arg_idx in 0..arguments.values.len() {
 
            let expected_type = &expr_data.arg_types[arg_idx];
 
            let provided_value = &arguments.values[arg_idx];
 
            if !self.verify_same_type(expected_type, 0, &arguments, provided_value) {
 
                return Err(ComponentCreationError::InvalidArgumentType(arg_idx));
 
            }
 
        }
 

	
 
        // By now we're sure that all of the arguments are correct. So create
 
        // the connector.
 
        return Ok(ComponentState{
 
            prompt: Prompt::new(&self.types, &self.heap, def, 0, arguments),
 
        });
 
    }
 

	
 
    fn lookup_module_root(&self, module_name: &[u8]) -> Option<RootId> {
 
        for module in self.modules.iter() {
 
            match &module.name {
 
@@ -161,6 +227,63 @@ impl ProtocolDescription {
 

	
 
        return None;
 
    }
 

	
 
    fn verify_same_type(&self, expected: &ConcreteType, expected_idx: usize, arguments: &ValueGroup, argument: &Value) -> bool {
 
        use ConcreteTypePart as CTP;
 

	
 
        macro_rules! match_variant {
 
            ($value:expr, $variant:expr) => {
 
                if let $variant(_) = $value { true } else { false }
 
            };
 
        }
 

	
 
        match &expected.parts[expected_idx] {
 
            CTP::Void | CTP::Message | CTP::Slice | CTP::Function(_, _) | CTP::Component(_, _) => unreachable!(),
 
            CTP::Bool => match_variant!(argument, Value::Bool),
 
            CTP::UInt8 => match_variant!(argument, Value::UInt8),
 
            CTP::UInt16 => match_variant!(argument, Value::UInt16),
 
            CTP::UInt32 => match_variant!(argument, Value::UInt32),
 
            CTP::UInt64 => match_variant!(argument, Value::UInt64),
 
            CTP::SInt8 => match_variant!(argument, Value::SInt8),
 
            CTP::SInt16 => match_variant!(argument, Value::SInt16),
 
            CTP::SInt32 => match_variant!(argument, Value::SInt32),
 
            CTP::SInt64 => match_variant!(argument, Value::SInt64),
 
            CTP::Character => match_variant!(argument, Value::Char),
 
            CTP::String => {
 
                // Match outer string type and embedded character types
 
                if let Value::String(heap_pos) = argument {
 
                    for element in &arguments.regions[*heap_pos as usize] {
 
                        if let Value::Char(_) = element {} else {
 
                            return false;
 
                        }
 
                    }
 
                } else {
 
                    return false;
 
                }
 

	
 
                return true;
 
            },
 
            CTP::Array => {
 
                if let Value::Array(heap_pos) = argument {
 
                    let heap_pos = *heap_pos;
 
                    for element in &arguments.regions[heap_pos as usize] {
 
                        if !self.verify_same_type(expected, expected_idx + 1, arguments, element) {
 
                            return false;
 
                        }
 
                    }
 
                    return true;
 
                } else {
 
                    return false;
 
                }
 
            },
 
            CTP::Input => match_variant!(argument, Value::Input),
 
            CTP::Output => match_variant!(argument, Value::Output),
 
            CTP::Instance(_definition_id, _num_embedded) => {
 
                todo!("implement full type checking on user-supplied arguments");
 
                return false;
 
            },
 
        }
 
    }
 
}
 

	
 
// TODO: @temp Should just become a concrete thing that is passed in
src/protocol/parser/pass_typing.rs
Show inline comments
 
@@ -1550,14 +1550,31 @@ impl PassTyping {
 

	
 
        // Every expression checked, and new monomorphs are queued. Transfer the
 
        // expression information to the type table.
 
        let definition_id = match &self.definition_type {
 
            DefinitionType::Component(id) => id.upcast(),
 
            DefinitionType::Function(id) => id.upcast(),
 
        let (definition_id, procedure_arguments) = match &self.definition_type {
 
            DefinitionType::Component(id) => {
 
                let definition = &ctx.heap[*id];
 
                (id.upcast(), &definition.parameters)
 
            },
 
            DefinitionType::Function(id) => {
 
                let definition = &ctx.heap[*id];
 
                (id.upcast(), &definition.parameters)
 
            },
 
        };
 

	
 
        let target = ctx.types.get_procedure_expression_data_mut(&definition_id, self.reserved_idx);
 
        debug_assert!(target.expr_data.is_empty()); // makes sure we never queue something twice
 
        debug_assert!(target.arg_types.is_empty()); // makes sure we never queue a procedure's type inferencing twice
 
        debug_assert!(target.expr_data.is_empty());
 

	
 
        // - Write the arguments to the procedure
 
        target.arg_types.reserve(procedure_arguments.len());
 
        for argument_id in procedure_arguments {
 
            let mut concrete = ConcreteType::default();
 
            let argument_type = self.var_types.get(argument_id).unwrap();
 
            argument_type.var_type.write_concrete_type(&mut concrete);
 
            target.arg_types.push(concrete);
 
        }
 

	
 
        // - Write the expression data
 
        target.expr_data.reserve(self.expr_types.len());
 
        for infer_expr in self.expr_types.iter() {
 
            let mut concrete = ConcreteType::default();
src/protocol/parser/type_table.rs
Show inline comments
 
@@ -320,6 +320,7 @@ pub struct PolymorphicVariable {
 
pub struct ProcedureMonomorph {
 
    // Expression data for one particular monomorph
 
    pub concrete_type: ConcreteType,
 
    pub arg_types: Vec<ConcreteType>,
 
    pub expr_data: Vec<MonomorphExpression>,
 
}
 

	
 
@@ -647,6 +648,7 @@ impl TypeTable {
 
        let mono_idx = mono_types.len();
 
        mono_types.push(ProcedureMonomorph{
 
            concrete_type,
 
            arg_types: Vec::new(),
 
            expr_data: Vec::new(),
 
        });
 

	
src/runtime2/connector.rs
Show inline comments
 
use std::collections::HashMap;
 
use std::sync::atomic::AtomicBool;
 

	
 
use crate::{PortId, ProtocolDescription};
 
use crate::protocol::{ComponentState, RunContext, RunResult};
 
use crate::protocol::eval::{Prompt, Value, ValueGroup};
 
use crate::runtime2::inbox::{Inbox, OutboxMessage};
 
use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage};
 
use crate::runtime2::port::PortIdLocal;
 

	
 
/// Represents the identifier of a branch (the index within its container). An
 
@@ -51,14 +52,14 @@ pub(crate) struct Branch {
 
    sync_state: SpeculativeState,
 
    next_branch_in_queue: Option<u32>,
 
    // Message/port state
 
    inbox: HashMap<PortIdLocal, OutboxMessage>, // TODO: @temporary, remove together with fires()
 
    inbox: HashMap<PortIdLocal, OutgoingMessage>, // TODO: @temporary, remove together with fires()
 
    ports_delta: Vec<PortOwnershipDelta>,
 
}
 

	
 
impl Branch {
 
    /// Constructs a non-sync branch. It is assumed that the code is at the
 
    /// first instruction
 
    fn new_initial_branch(component_state: ComponentState) -> Self {
 
    pub(crate) fn new_initial_branch(component_state: ComponentState) -> Self {
 
        Branch{
 
            index: BranchId::new_invalid(),
 
            parent_index: BranchId::new_invalid(),
 
@@ -135,13 +136,13 @@ enum PortOwnershipError {
 
/// As the name implies, this contains a description of the ports associated
 
/// with a connector.
 
/// TODO: Extend documentation
 
struct ConnectorPorts {
 
pub(crate) struct ConnectorPorts {
 
    // Essentially a mapping from `port_index` to `port_id`.
 
    owned_ports: Vec<PortIdLocal>,
 
    pub owned_ports: Vec<PortIdLocal>,
 
    // Contains P*B entries, where P is the number of ports and B is the number
 
    // of branches. One can find the appropriate mapping of port p at branch b
 
    // at linear index `b*P+p`.
 
    port_mapping: Vec<PortAssignment>
 
    pub port_mapping: Vec<PortAssignment>
 
}
 

	
 
impl ConnectorPorts {
 
@@ -246,22 +247,23 @@ impl BranchQueue {
 
}
 

	
 
/// Public fields of the connector that can be freely shared between multiple
 
/// threads. Note that this is not enforced by the compiler. The global store
 
/// allows retrieving the entire `Connector` as a mutable reference by one
 
/// thread, and this `ConnectorPublic` by any number of threads.
 
/// threads.
 
pub(crate) struct ConnectorPublic {
 
    pub inbox: Inbox,
 
    pub inbox: PublicInbox,
 
    pub sleeping: AtomicBool,
 
}
 

	
 
impl ConnectorPublic {
 
    pub fn new() -> Self {
 
        ConnectorPublic{
 
            inbox: Inbox::new(),
 
            inbox: PublicInbox::new(),
 
            sleeping: AtomicBool::new(false),
 
        }
 
    }
 
}
 

	
 
// TODO: Maybe prevent false sharing by aligning `public` to next cache line.
 
// TODO: Do this outside of the connector, create a wrapping struct
 
pub(crate) struct Connector {
 
    // State and properties of connector itself
 
    id: u32,
 
@@ -272,8 +274,8 @@ pub(crate) struct Connector {
 
    sync_pending_get: BranchQueue,
 
    sync_finished: BranchQueue,
 
    // Port/message management
 
    pub inbox: PrivateInbox,
 
    pub ports: ConnectorPorts,
 
    pub public: ConnectorPublic,
 
}
 

	
 
struct TempCtx {}
 
@@ -307,8 +309,8 @@ impl Connector {
 
            sync_active: BranchQueue::new(),
 
            sync_pending_get: BranchQueue::new(),
 
            sync_finished: BranchQueue::new(),
 
            inbox: PrivateInbox::new(),
 
            ports: ConnectorPorts::new(owned_ports),
 
            public: ConnectorPublic::new(),
 
        }
 
    }
 

	
 
@@ -471,7 +473,7 @@ impl Connector {
 
                    // Put in run results for thread to pick up and transfer to
 
                    // the correct connector inbox.
 
                    port_mapping.mark_definitive(branch.index, 1);
 
                    let message = OutboxMessage {
 
                    let message = OutgoingMessage {
 
                        sending_port: local_port_id,
 
                        sender_prev_branch_id: BranchId::new_invalid(),
 
                        sender_cur_branch_id: branch.index,
 
@@ -709,7 +711,7 @@ impl Connector {
 
pub(crate) struct RunDeltaState {
 
    // Variables that allow the thread running the connector to pick up global
 
    // state changes and try to apply them.
 
    pub outbox: Vec<OutboxMessage>,
 
    pub outbox: Vec<OutgoingMessage>,
 
    pub new_connectors: Vec<Connector>,
 
    // Workspaces
 
    pub ports: Vec<PortIdLocal>,
 
@@ -736,7 +738,7 @@ pub(crate) enum ConnectorScheduling {
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
src/runtime2/global_store.rs
Show inline comments
 
@@ -2,13 +2,53 @@ use crate::collections::{MpmcQueue, RawVec};
 

	
 
use super::connector::{Connector, ConnectorPublic};
 
use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel};
 
use super::inbox::PublicInbox;
 
use super::scheduler::Router;
 

	
 
use std::ptr;
 
use std::sync::{RwLock, RwLockReadGuard};
 
use std::sync::{Barrier, RwLock, RwLockReadGuard};
 
use std::sync::atomic::AtomicBool;
 

	
 
/// A kind of token that, once obtained, allows mutable access to a connector.
 
/// We're trying to use move semantics as much as possible: the owner of this
 
/// key is the only one that may execute the connector's code.
 
pub(crate) struct ConnectorKey {
 
    pub index: u32, // of connector
 
}
 

	
 
impl ConnectorKey {
 
    /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable
 
    /// access, to a "regular ID" which can be used to obtain immutable access.
 
    #[inline]
 
    pub fn downcast(&self) -> ConnectorId {
 
        return ConnectorId(self.index);
 
    }
 

	
 
    /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it
 
    /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system
 
    #[inline]
 
    pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey {
 
        return ConnectorKey{ index: id.0 };
 
    }
 
}
 

	
 
/// A kind of token that, once obtained, allows access to a container.
 
struct ConnectorKey {
 
    index: u32, // of connector
 
/// A kind of token that allows shared access to a connector. Multiple threads
 
/// may hold this
 
#[derive(Copy, Clone)]
 
pub(crate) struct ConnectorId(u32);
 

	
 
impl ConnectorId {
 
    // TODO: Like the other `new_invalid`, maybe remove
 
    #[inline]
 
    pub fn new_invalid() -> ConnectorId {
 
        return ConnectorId(u32::MAX);
 
    }
 
}
 

	
 
pub struct ScheduledConnector {
 
    pub connector: Connector,
 
    pub public: ConnectorPublic,
 
    pub router: Router
 
}
 

	
 
/// The registry containing all connectors. The idea here is that when someone
 
@@ -21,7 +61,7 @@ struct ConnectorStore {
 
}
 

	
 
struct ConnectorStoreInner {
 
    connectors: RawVec<*mut Connector>,
 
    connectors: RawVec<*mut ScheduledConnector>,
 
    free: Vec<usize>,
 
}
 

	
 
@@ -36,11 +76,11 @@ impl ConnectorStore {
 
    }
 

	
 
    /// Retrieves the shared members of the connector.
 
    pub(crate) fn get_shared(&self, connector_id: u32) -> &'static ConnectorPublic {
 
    pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
            let connector = lock.connectors.get(connector_id as usize);
 
            let connector = lock.connectors.get(connector_id.0 as usize);
 
            debug_assert!(!connector.is_null());
 
            return &*connector.public;
 
        }
 
@@ -48,7 +88,7 @@ impl ConnectorStore {
 

	
 
    /// Retrieves a particular connector. Only the thread that pulled the
 
    /// associated key out of the execution queue should (be able to) call this.
 
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector {
 
    pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector {
 
        let lock = self.inner.read().unwrap();
 

	
 
        unsafe {
 
@@ -62,6 +102,11 @@ impl ConnectorStore {
 
    /// and/or queue it.
 
    pub(crate) fn create(&self, connector: Connector) -> ConnectorKey {
 
        let lock = self.inner.write().unwrap();
 
        let connector = ScheduledConnector{
 
            connector,
 
            public: ConnectorPublic::new(),
 
            router: Router::new(),
 
        };
 

	
 
        let index;
 
        if lock.free.is_empty() {
 
@@ -144,14 +189,17 @@ impl PortStore {
 
        }
 
    }
 

	
 
    pub(crate) fn create_channel(&self, creating_connector: Option<u32>) -> Channel {
 
    pub(crate) fn create_channel(&self, creating_connector: Option<ConnectorId>) -> Channel {
 
        let mut lock = self.inner.write().unwrap();
 

	
 
        // Reserves a new port. Doesn't point it to its counterpart
 
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option<u32>) -> u32 {
 
        fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option<ConnectorId>) -> u32 {
 
            let index;
 
            let ownership = if creating_connector.is_some() { PortOwnership::Owned } else { PortOwnership::Unowned };
 
            let connector_id = creating_connector.unwrap_or(0);
 
            let (ownership, connector_id) = if creating_connector.is_some() {
 
                (PortOwnership::Owned, creating_connector.unwrap())
 
            } else {
 
                (PortOwnership::Unowned, ConnectorId::new_invalid())
 
            };
 

	
 
            if lock.free.is_empty() {
 
                index = lock.ports.len() as u32;
 
@@ -237,10 +285,12 @@ impl Drop for PortStore {
 
///
 
/// TODO: @docs
 
/// TODO: @Optimize, very lazy implementation of concurrent datastructures.
 
///     This includes the `should_exit` and `did_exit` pair!
 
pub struct GlobalStore {
 
    pub connector_queue: MpmcQueue<ConnectorKey>,
 
    pub connectors: ConnectorStore,
 
    pub ports: PortStore,
 
    pub should_exit: AtomicBool,    // signal threads to exit
 
}
 

	
 
impl GlobalStore {
 
@@ -249,6 +299,7 @@ impl GlobalStore {
 
            connector_queue: MpmcQueue::with_capacity(256),
 
            connectors: ConnectorStore::with_capacity(256),
 
            ports: PortStore::with_capacity(256),
 
            should_exit: AtomicBool::new(false),
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/inbox.rs
Show inline comments
 
/**
 
inbox.rs
 

	
 
Contains various types of inboxes and message types for the connectors. There
 
are two kinds of inboxes:
 

	
 
The `PublicInbox` is a simple message queue. Messages are put in by various
 
threads, and they're taken out by a single thread. These messages may contain
 
control messages and may be filtered or redirected by the scheduler.
 

	
 
The `PrivateInbox` is a temporary storage for all messages that are received
 
within a certain sync-round.
 
**/
 

	
 
use std::collections::VecDeque;
 
use std::sync::{RwLock, RwLockReadGuard, Mutex};
 
use std::sync::atomic::{AtomicUsize, Ordering};
 

	
 
use crate::protocol::eval::ValueGroup;
 
use crate::runtime2::connector::{BranchId, PortIdLocal};
 
use super::connector::{BranchId, PortIdLocal};
 
use super::global_store::ConnectorId;
 

	
 
/// A message prepared by a connector. Waiting to be picked up by the runtime to
 
/// be sent to another connector.
 
#[derive(Clone)]
 
pub struct OutboxMessage {
 
pub struct OutgoingMessage {
 
    pub sending_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
 
    pub sender_cur_branch_id: BranchId, // always valid
 
    pub message: ValueGroup,
 
}
 

	
 
/// A message inserted into the inbox of a connector by the runtime.
 
/// A message that has been delivered (after being imbued with the receiving
 
/// port by the scheduler) to a connector.
 
#[derive(Clone)]
 
pub struct InboxMessage {
 
pub struct DataMessage {
 
    pub sending_connector: ConnectorId,
 
    pub sending_port: PortIdLocal,
 
    pub receiving_port: PortIdLocal,
 
    pub sender_prev_branch_id: BranchId,
 
@@ -25,48 +42,80 @@ pub struct InboxMessage {
 
    pub message: ValueGroup,
 
}
 

	
 
/// A message sent between connectors to communicate something about their
 
/// scheduling state.
 
pub enum ControlMessage {
 
    ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID)
 
    Ack(u32), // (control message ID)
 
/// A control message. These might be sent by the scheduler to notify eachother
 
/// of asynchronous state changes.
 
pub struct ControlMessage {
 
    pub id: u32, // generic identifier, used to match request to response
 
    pub sender: ConnectorId,
 
    pub content: ControlMessageVariant,
 
}
 

	
 
pub enum ControlMessageVariant {
 
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
 
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
 
}
 

	
 
/// Generic message in the `PublicInbox`, handled by the scheduler (which takes
 
/// out and handles all control message and potential routing). The correctly
 
/// addressed `Data` variants will end up at the connector.
 
pub enum Message {
 
    Data(DataMessage),
 
    Control(ControlMessage),
 
}
 

	
 
/// The public inbox of a connector. The thread running the connector that owns
 
/// this inbox may retrieved from it. Non-owning threads may only put new
 
/// messages inside of it.
 
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
 
//  Should behave as a MPSC queue.
 
pub struct PublicInbox {
 
    messages: Mutex<VecDeque<Message>>,
 
}
 

	
 
/// The inbox of a connector. The owning connector (i.e. the thread that is
 
/// executing the connector) should be able to read all messages. Other
 
/// connectors (potentially executed by different threads) should be able to
 
/// append messages.
 
///
 
/// If a connector has no more code to run, and its inbox does not contain any
 
/// new messages, then it may go into sleep mode.
 
///
 
// TODO: @Optimize, this is a temporary lazy implementation
 
pub struct Inbox {
 
impl PublicInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: Mutex::new(VecDeque::new()),
 
        }
 
    }
 

	
 
    pub fn insert_message(&self, message: Message) {
 
        let mut lock = self.messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_message(&self) -> Option<Message> {
 
        let mut lock = self.messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 

	
 
    pub fn is_empty(&self) -> bool {
 
        let lock = self.messages.lock().unwrap();
 
        return lock.is_empty();
 
    }
 
}
 

	
 
pub struct PrivateInbox {
 
    // "Normal" messages, intended for a PDL protocol. These need to stick
 
    // around during an entire sync-block (to handle `put`s for which the
 
    // corresponding `get`s have not yet been reached).
 
    messages: RwLock<Vec<InboxMessage>>,
 
    len_read: AtomicUsize,
 
    // System messages. These are handled by the scheduler and only need to be
 
    // handled once.
 
    system_messages: Mutex<VecDeque<ControlMessage>>,
 
    messages: Vec<DataMessage>,
 
    len_read: usize,
 
}
 

	
 
impl Inbox {
 
impl PrivateInbox {
 
    pub fn new() -> Self {
 
        Self{
 
            messages: RwLock::new(Vec::new()),
 
            len_read: AtomicUsize::new(0),
 
            system_messages: Mutex::new(VecDeque::new()),
 
            messages: Vec::new(),
 
            len_read: 0,
 
        }
 
    }
 

	
 
    /// Will insert the message into the inbox. Only exception is when the tuple
 
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
 
    /// nothing is inserted..
 
    pub fn insert_message(&self, message: InboxMessage) {
 
        let mut messages = self.messages.write().unwrap();
 
        for existing in messages.iter() {
 
    pub fn insert_message(&mut self, message: DataMessage) {
 
        for existing in self.messages.iter() {
 
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
 
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
 
                    existing.receiving_port == message.receiving_port {
 
@@ -74,7 +123,8 @@ impl Inbox {
 
                return;
 
            }
 
        }
 
        messages.push(message);
 

	
 
        self.messages.push(message);
 
    }
 

	
 
    /// Retrieves all previously read messages that satisfy the provided
 
@@ -86,11 +136,10 @@ impl Inbox {
 
    /// could be received by a newly encountered `get` call in a connector's
 
    /// PDL code.
 
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
 
        let lock = self.messages.read().unwrap();
 
        return InboxMessageIter{
 
            lock,
 
            messages: &self.messages,
 
            next_index: 0,
 
            max_index: self.len_read.load(Ordering::Acquire),
 
            max_index: self.len_read,
 
            match_port_id: port_id,
 
            match_prev_branch_id: prev_branch_id,
 
        };
 
@@ -98,58 +147,26 @@ impl Inbox {
 

	
 
    /// Retrieves the next unread message. Should only be called by the
 
    /// inbox-reader.
 
    pub fn next_message(&self) -> Option<InboxMessageRef> {
 
        let lock = self.messages.read().unwrap();
 
        let cur_index = self.len_read.load(Ordering::Acquire);
 
        if cur_index >= lock.len() {
 
    pub fn next_message(&mut self) -> Option<&DataMessage> {
 
        if self.len_read == self.messages.len() {
 
            return None;
 
        }
 

	
 
        // TODO: Accept the correctness and simply make it an add, or even
 
        //  remove the atomic altogether.
 
        if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) {
 
            panic!("multiple readers modifying number of messages read");
 
        }
 

	
 
        return Some(InboxMessageRef{
 
            lock,
 
            index: cur_index,
 
        });
 
        let to_return = &self.messages[self.len_read];
 
        self.len_read += 1;
 
        return Some(to_return);
 
    }
 

	
 
    /// Simply empties the inbox
 
    pub fn clear(&mut self) {
 
        self.messages.clear();
 
    }
 

	
 
    pub fn insert_control_message(&self, message: ControlMessage) {
 
        let mut lock = self.system_messages.lock().unwrap();
 
        lock.push_back(message);
 
    }
 

	
 
    pub fn take_control_message(&self) -> Option<ControlMessage> {
 
        let mut lock = self.system_messages.lock().unwrap();
 
        return lock.pop_front();
 
    }
 
}
 

	
 
/// Reference to a new message
 
pub struct InboxMessageRef<'i> {
 
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
 
    index: usize,
 
}
 

	
 
impl<'i> std::ops::Deref for InboxMessageRef<'i> {
 
    type Target = InboxMessage;
 

	
 
    fn deref(&self) -> &'i Self::Target {
 
        return &self.lock[self.index];
 
        self.len_read = 0;
 
    }
 
}
 

	
 
/// Iterator over previously received messages in the inbox.
 
pub struct InboxMessageIter<'i> {
 
    lock: RwLockReadGuard<'i, Vec<InboxMessage>>,
 
    messages: &'i Vec<DataMessage>,
 
    next_index: usize,
 
    max_index: usize,
 
    match_port_id: PortIdLocal,
 
@@ -157,12 +174,12 @@ pub struct InboxMessageIter<'i> {
 
}
 

	
 
impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
 
    type Item = &'m InboxMessage;
 
    type Item = &'m DataMessage;
 

	
 
    fn next(&'m mut self) -> Option<Self::Item> {
 
        // Loop until match is found or at end of messages
 
        while self.next_index < self.max_index {
 
            let cur_message = &self.lock[self.next_index];
 
            let cur_message = &self.messages[self.next_index];
 
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
 
                // Found a match
 
                break;
 
@@ -175,7 +192,7 @@ impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
 
            return None;
 
        }
 

	
 
        let message = &self.lock[self.next_index];
 
        let message = &self.messages[self.next_index];
 
        self.next_index += 1;
 
        return Some(message);
 
    }
src/runtime2/mod.rs
Show inline comments
 
// Structure of module
 

	
 
mod runtime;
 
mod messages;
 
mod connector;
 
mod port;
 
mod global_store;
 
mod scheduler;
 
mod inbox;
 

	
 
#[cfg(test)] mod tests;
 
mod inbox;
 

	
 
// Imports
 

	
 
use std::sync::Arc;
 
use std::thread::{self, JoinHandle};
 

	
 
use crate::protocol::eval::*;
 
use crate::{common::Id, PortId, ProtocolDescription};
 

	
 
use global_store::GlobalStore;
 
use scheduler::Scheduler;
 
use crate::protocol::ComponentCreationError;
 
use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group};
 

	
 

	
 
// Runtime API
 
pub struct Runtime {
 
    global_store: Arc<GlobalStore>,
 
    protocol_description: Arc<ProtocolDescription>,
 
    schedulers: Vec<JoinHandle<()>>
 
}
 

	
 
impl Runtime {
 
    pub fn new(num_threads: usize, protocol_description: Arc<ProtocolDescription>) -> Runtime {
 
        // Setup global state
 
        assert!(num_threads > 0, "need a thread to run connectors");
 
        let global_store = Arc::new(GlobalStore::new());
 

	
 
        // Launch threads
 
        let mut schedulers = Vec::with_capacity(num_threads);
 
        for _ in 0..num_threads {
 
            let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone());
 
            let thread = thread::spawn(move || {
 
                scheduler.run();
 
            });
 

	
 
            schedulers.push(thread);
 
        }
 

	
 
        // Move innards into runtime struct
 
        return Runtime{
 
            global_store,
 
            protocol_description,
 
            schedulers,
 
        }
 
    }
 

	
 
    /// Returns (putter port, getter port)
 
    pub fn create_channel(&self) -> (Value, Value) {
 
        let channel = self.global_store.ports.create_channel(None);
 
        let putter_value = Value::Output(PortId(Id{
 
            connector_id: u32::MAX,
 
            u32_suffix: channel.putter_id,
 
        }));
 
        let getter_value = Value::Input(PortId(Id{
 
            connector_id: u32::MAX,
 
            u32_suffix: channel.getter_id,
 
        }));
 
        return (putter_value, getter_value);
 
    }
 

	
 
    pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> {
 
        // TODO: Remove component creation function from PD, should not be concerned with it
 
        // Create the connector and mark the ports as now owned by the
 
        // connector
 
        let mut port_ids = Vec::new();
 
        find_ports_in_value_group(&values, &mut port_ids);
 

	
 
        let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?;
 
        let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone());
 
        let connector_key = self.global_store.connectors.create(connector);
 

	
 
        for port_id in port_ids {
 
            let port = self.global_store.ports.get(&connector_key, port_id);
 
            port.owning_connector = connector_key.downcast();
 
            port.peer_connector
 
            // TODO: Note that we immediately need to notify the other side of the connector that
 
            //  the port has moved!
 
        }
 
    }
 
}
 

	
 
impl Drop for Runtime {
 
    fn drop(&mut self) {
 

	
 
    }
 
}
 
\ No newline at end of file
src/runtime2/port.rs
Show inline comments
 
use super::global_store::ConnectorId;
 

	
 
#[derive(Clone, Copy, PartialEq, Eq)]
 
pub(crate) struct PortIdLocal {
 
    pub index: u32,
 
@@ -40,8 +42,8 @@ pub struct Port {
 
    pub kind: PortKind,
 
    // But this can be changed, but only by the connector that owns it
 
    pub ownership: PortOwnership,
 
    pub owning_connector: u32,
 
    pub peer_connector: u32, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
    pub owning_connector: ConnectorId,
 
    pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase.
 
}
 

	
 

	
src/runtime2/scheduler.rs
Show inline comments
 
use std::sync::Arc;
 
use std::sync::Condvar;
 
use std::sync::atomic::Ordering;
 
use std::time::Duration;
 
use std::thread;
 

	
 
use crate::ProtocolDescription;
 

	
 
use super::inbox::InboxMessage;
 
use super::connector::{Connector, ConnectorScheduling, RunDeltaState};
 
use super::global_store::GlobalStore;
 
use super::port::{PortIdLocal};
 
use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant};
 
use super::connector::{Connector, ConnectorPublic, ConnectorScheduling, RunDeltaState};
 
use super::global_store::{ConnectorKey, ConnectorId, GlobalStore};
 

	
 
struct Scheduler {
 
pub(crate) struct Scheduler {
 
    global: Arc<GlobalStore>,
 
    code: Arc<ProtocolDescription>,
 
}
 
@@ -23,82 +27,268 @@ impl Scheduler {
 
    pub fn run(&mut self) {
 
        // Setup global storage and workspaces that are reused for every
 
        // connector that we run
 
        // TODO: @Memory, scheme for reducing allocations if excessive.
 
        let mut delta_state = RunDeltaState::new();
 

	
 
        loop {
 
            // TODO: Check if we're supposed to exit
 

	
 
        'thread_loop: loop {
 
            // Retrieve a unit of work
 
            let connector_key = self.global.connector_queue.pop_front();
 
            if connector_key.is_none() {
 
                // TODO: @Performance, needs condition variable for waking up
 
                // TODO: @Performance, needs condition or something, and most
 
                //  def' not sleeping
 
                thread::sleep(Duration::new(1, 0));
 
                continue
 
                if self.global.should_exit.load(Ordering::Acquire) {
 
                    // Thread exits!
 
                    break 'thread_loop;
 
                }
 

	
 
                continue 'thread_loop;
 
            }
 

	
 
            // We have something to do
 
            let connector_key = connector_key.unwrap();
 
            let connector = self.global.connectors.get_mut(&connector_key);
 
            let scheduled = self.global.connectors.get_mut(&connector_key);
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 

	
 
            while cur_schedule == ConnectorScheduling::Immediate {
 
                let new_schedule;
 
                // Check all the message that are in the shared inbox
 
                while let Some(message) = scheduled.public.inbox.take_message() {
 
                    match message {
 
                        Message::Data(message) => {
 
                            // Check if we need to reroute, or can just put it
 
                            // in the private inbox of the connector
 
                            if let Some(other_connector_id) = scheduled.router.should_reroute(&message) {
 
                                self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message));
 
                            } else {
 
                                scheduled.connector.inbox.insert_message(message);
 
                            }
 
                        },
 
                        Message::Control(message) => {
 
                            match message.content {
 
                                ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => {
 
                                    // Need to change port target
 
                                    let port = self.global.ports.get(&connector_key, port_id);
 
                                    port.peer_connector = new_target_connector_id;
 
                                    debug_assert!(delta_state.outbox.is_empty());
 

	
 
                // TODO: Check inbox for new message
 
                                    // And respond with an Ack
 
                                    self.send_message_and_wake_up_if_sleeping(
 
                                        message.sender,
 
                                        Message::Control(ControlMessage{
 
                                            id: message.id,
 
                                            sender: connector_key.downcast(),
 
                                            content: ControlMessageVariant::Ack,
 
                                        })
 
                                    );
 
                                },
 
                                ControlMessageVariant::Ack => {
 
                                    scheduled.router.handle_ack(message.id);
 
                                }
 
                            }
 
                        }
 
                    }
 
                }
 

	
 
                if connector.is_in_sync_mode() {
 
                // Actually run the connector
 
                let new_schedule;
 
                if scheduled.connector.is_in_sync_mode() {
 
                    // In synchronous mode, so we can expect messages being sent,
 
                    // but we never expect the creation of connectors
 
                    new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
 
                    new_schedule = scheduled.connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.new_connectors.is_empty());
 

	
 
                    if !delta_state.outbox.is_empty() {
 
                        // There are message to send
 
                        for message in delta_state.outbox.drain(..) {
 
                            let (inbox_message, target_connector_id) = {
 
                                // Note: retrieving a port incurs a read lock
 
                                let sending_port = self.global.ports.get(&connector_key, message.sending_port);
 
                                (
 
                                    InboxMessage {
 
                                        sending_port: sending_port.self_id,
 
                                        receiving_port: sending_port.peer_id,
 
                                        sender_prev_branch_id: message.sender_prev_branch_id,
 
                                        sender_cur_branch_id: message.sender_cur_branch_id,
 
                                        message: message.message,
 
                                    },
 
                                    sending_port.peer_connector,
 
                                )
 
                            };
 

	
 
                            let target_connector = self.global.connectors.get_shared(target_connector_id);
 
                            target_connector.inbox.insert_message(inbox_message);
 

	
 
                            // TODO: Check silent state. Queue connector if it was silent
 
                        }
 
                    }
 
                } else {
 
                    // In regular running mode (not in a sync block) we cannot send
 
                    // messages but we can create new connectors
 
                    new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
 
                    new_schedule = scheduled.connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state);
 
                    debug_assert!(delta_state.outbox.is_empty());
 
                }
 

	
 
                    if !delta_state.new_connectors.is_empty() {
 
                        // Push all connectors into the global state and queue them
 
                        // for execution
 
                        for connector in delta_state.new_connectors.drain(..) {
 
                            // Create connector, modify all of the ports that
 
                            // it now owns, then queue it for execution
 
                            let connector_key = self.global.connectors.create(connector);
 
                            
 
                // Handle all of the output from the current run: messages to
 
                // send and connectors to instantiate.
 
                self.handle_delta_state(&connector_key, &mut delta_state);
 

	
 
                cur_schedule = new_schedule;
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.global.connector_queue.push_back(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false);
 
                    scheduled.public.sleeping.store(true, Ordering::Release);
 

	
 
                    // We might have received a message in the meantime from a
 
                    // thread that did not see the sleeping flag set to `true`,
 
                    // so:
 
                    if !scheduled.public.inbox.is_empty() {
 
                        let should_reschedule_self = scheduled.public.sleeping
 
                            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
                            .is_ok();
 

	
 
                        if should_reschedule_self {
 
                            self.global.connector_queue.push_back(connector_key);
 
                        }
 
                    }
 
                }
 
            }
 
        }
 
    }
 

	
 
                cur_schedule = new_schedule;
 
    fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) {
 
        // Handling any messages that were sent
 
        if !delta_state.outbox.is_empty() {
 
            for message in delta_state.outbox.drain(..) {
 
                let (inbox_message, target_connector_id) = {
 
                    let sending_port = self.global.ports.get(&connector_key, message.sending_port);
 
                    (
 
                        DataMessage {
 
                            sending_connector: connector_key.downcast(),
 
                            sending_port: sending_port.self_id,
 
                            receiving_port: sending_port.peer_id,
 
                            sender_prev_branch_id: message.sender_prev_branch_id,
 
                            sender_cur_branch_id: message.sender_cur_branch_id,
 
                            message: message.message,
 
                        },
 
                        sending_port.peer_connector,
 
                    )
 
                };
 

	
 
                self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message));
 
            }
 
        }
 

	
 
        // Handling any new connectors that were scheduled
 
        // TODO: Pool outgoing messages to reduce atomic access
 
        if !delta_state.new_connectors.is_empty() {
 
            let cur_connector = self.global.connectors.get_mut(connector_key);
 

	
 
            for new_connector in delta_state.new_connectors.drain(..) {
 
                // Add to global registry to obtain key
 
                let new_key = self.global.connectors.create(new_connector);
 
                let new_connector = self.global.connectors.get_mut(&new_key);
 

	
 
                // Each port should be lost by the connector that created the
 
                // new one. Note that the creator is the current owner.
 
                for port_id in &new_connector.ports.owned_ports {
 
                    debug_assert!(!cur_connector.ports.owned_ports.contains(port_id));
 

	
 
                    // Modify ownership, retrieve peer connector
 
                    let (peer_connector_id, peer_port_id) = {
 
                        let mut port = self.global.ports.get(connector_key, *port_id);
 
                        port.owning_connector = new_key.downcast();
 

	
 
                        (port.peer_connector, port.peer_id)
 
                    };
 

	
 
                    // Send message that port has changed ownership
 
                    let reroute_message = cur_connector.router.prepare_reroute(
 
                        port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast()
 
                    );
 

	
 
                    self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message);
 
                }
 

	
 
                // Schedule new connector to run
 
                self.global.connector_queue.push_back(new_key);
 
            }
 
        }
 
    }
 

	
 
    pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) {
 
        let connector = self.global.connectors.get_shared(connector_id);
 

	
 
        connector.inbox.insert_message(message);
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe { ConnectorKey::from_id(connector_id) };
 
            self.global.connector_queue.push_back(key);
 
        }
 
    }
 
}
 

	
 
/// Represents a rerouting entry due to a moved port
 
// TODO: Optimize
 
struct ReroutedTraffic {
 
    id: u32,                        // ID of control message
 
    port: PortIdLocal,              // targeted port
 
    source_connector: ConnectorId,  // connector we expect messages from
 
    target_connector: ConnectorId,  // connector they should be rerouted to
 
}
 

	
 
pub(crate) struct Router {
 
    id_counter: u32,
 
    active: Vec<ReroutedTraffic>,
 
}
 

	
 
impl Router {
 
    pub fn new() -> Self {
 
        Router{
 
            id_counter: 0,
 
            active: Vec::new(),
 
        }
 
    }
 

	
 
    /// Prepares rerouting messages due to changed ownership of a port. The
 
    /// control message returned by this function must be sent to the
 
    /// transferred port's peer connector.
 
    pub fn prepare_reroute(
 
        &mut self,
 
        port_id: PortIdLocal, peer_port_id: PortIdLocal,
 
        self_connector_id: ConnectorId, peer_connector_id: ConnectorId,
 
        new_owner_connector_id: ConnectorId
 
    ) -> Message {
 
        let id = self.id_counter;
 
        self.id_counter.overflowing_add(1);
 

	
 
        self.active.push(ReroutedTraffic{
 
            id,
 
            port: port_id,
 
            source_connector: peer_connector_id,
 
            target_connector: new_owner_connector_id,
 
        });
 

	
 
        return Message::Control(ControlMessage{
 
            id,
 
            sender: self_connector_id,
 
            content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id)
 
        });
 
    }
 

	
 
    /// Returns true if the supplied message should be rerouted. If so then this
 
    /// function returns the connector that should retrieve this message.
 
    pub fn should_reroute(&self, message: &DataMessage) -> Option<ConnectorId> {
 
        for reroute in &self.active {
 
            if reroute.source_connector == message.sending_connector &&
 
                reroute.port == message.sending_port {
 
                // Need to reroute this message
 
                return Some(reroute.target_connector);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    /// Handles an Ack as an answer to a previously sent control message
 
    pub fn handle_ack(&mut self, id: u32) {
 
        let index = self.active.iter()
 
            .position(|v| v.id == id);
 

	
 
        match index {
 
            Some(index) => { self.active.remove(index); },
 
            None => { todo!("handling of nefarious ACKs"); },
 
        }
 
    }
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)