diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index fbe0fca45345572d5c03507ce1147e34341b357e..f8fa2a8c7997917373315e46cdab473febf83d45 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -44,6 +44,15 @@ pub(crate) enum EvalContext<'a> { } ////////////////////////////////////////////// +#[derive(Debug)] +pub enum ComponentCreationError { + ModuleDoesntExist, + DefinitionDoesntExist, + DefinitionNotComponent, + InvalidNumArguments, + InvalidArgumentType(usize), +} + impl std::fmt::Debug for ProtocolDescription { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "(An opaque protocol description)") @@ -79,6 +88,8 @@ impl ProtocolDescription { pool: Mutex::new(parser.string_pool), }); } + + #[deprecated] pub(crate) fn component_polarities( &self, module_name: &[u8], @@ -130,7 +141,9 @@ impl ProtocolDescription { } Ok(result) } + // expects port polarities to be correct + #[deprecated] pub(crate) fn new_component(&self, module_name: &[u8], identifier: &[u8], ports: &[PortId]) -> ComponentState { let mut args = Vec::new(); for (&x, y) in ports.iter().zip(self.component_polarities(module_name, identifier).unwrap()) { @@ -147,6 +160,59 @@ impl ProtocolDescription { ComponentState { prompt: Prompt::new(&self.types, &self.heap, def, 0, ValueGroup::new_stack(args)) } } + // TODO: Ofcourse, rename this at some point, perhaps even remove it in its + // entirety. Find some way to interface with the parameter's types. + pub(crate) fn new_component_v2( + &self, module_name: &[u8], identifier: &[u8], arguments: ValueGroup + ) -> Result { + // Find the module in which the definition can be found + let module_root = self.lookup_module_root(module_name); + if module_root.is_none() { + return Err(ComponentCreationError::ModuleDoesntExist); + } + let module_root = module_root.unwrap(); + + let root = &self.heap[module_root]; + let definition_id = root.get_definition_ident(&heap, identifier); + if definition_id.is_none() { + return Err(ComponentCreationError::DefinitionDoesntExist); + } + let definition_id = definition_id.unwrap(); + + let definition = &self.heap[definition_id]; + if !definition.is_component() { + return Err(ComponentCreationError::DefinitionNotComponent); + } + + // Make sure that the types of the provided value group matches that of + // the expected types. + let definition = definition.as_component(); + if !definition.poly_vars.is_empty() { + return Err(ComponentCreationError::DefinitionNotComponent); + } + + // - check number of arguments + let expr_data = self.types.get_procedure_expression_data(&definition_id, 0); + if expr_data.arg_types.len() != arguments.values.len() { + return Err(ComponentCreationError::InvalidNumArguments); + } + + // - for each argument try to make sure the types match + for arg_idx in 0..arguments.values.len() { + let expected_type = &expr_data.arg_types[arg_idx]; + let provided_value = &arguments.values[arg_idx]; + if !self.verify_same_type(expected_type, 0, &arguments, provided_value) { + return Err(ComponentCreationError::InvalidArgumentType(arg_idx)); + } + } + + // By now we're sure that all of the arguments are correct. So create + // the connector. + return Ok(ComponentState{ + prompt: Prompt::new(&self.types, &self.heap, def, 0, arguments), + }); + } + fn lookup_module_root(&self, module_name: &[u8]) -> Option { for module in self.modules.iter() { match &module.name { @@ -161,6 +227,63 @@ impl ProtocolDescription { return None; } + + fn verify_same_type(&self, expected: &ConcreteType, expected_idx: usize, arguments: &ValueGroup, argument: &Value) -> bool { + use ConcreteTypePart as CTP; + + macro_rules! match_variant { + ($value:expr, $variant:expr) => { + if let $variant(_) = $value { true } else { false } + }; + } + + match &expected.parts[expected_idx] { + CTP::Void | CTP::Message | CTP::Slice | CTP::Function(_, _) | CTP::Component(_, _) => unreachable!(), + CTP::Bool => match_variant!(argument, Value::Bool), + CTP::UInt8 => match_variant!(argument, Value::UInt8), + CTP::UInt16 => match_variant!(argument, Value::UInt16), + CTP::UInt32 => match_variant!(argument, Value::UInt32), + CTP::UInt64 => match_variant!(argument, Value::UInt64), + CTP::SInt8 => match_variant!(argument, Value::SInt8), + CTP::SInt16 => match_variant!(argument, Value::SInt16), + CTP::SInt32 => match_variant!(argument, Value::SInt32), + CTP::SInt64 => match_variant!(argument, Value::SInt64), + CTP::Character => match_variant!(argument, Value::Char), + CTP::String => { + // Match outer string type and embedded character types + if let Value::String(heap_pos) = argument { + for element in &arguments.regions[*heap_pos as usize] { + if let Value::Char(_) = element {} else { + return false; + } + } + } else { + return false; + } + + return true; + }, + CTP::Array => { + if let Value::Array(heap_pos) = argument { + let heap_pos = *heap_pos; + for element in &arguments.regions[heap_pos as usize] { + if !self.verify_same_type(expected, expected_idx + 1, arguments, element) { + return false; + } + } + return true; + } else { + return false; + } + }, + CTP::Input => match_variant!(argument, Value::Input), + CTP::Output => match_variant!(argument, Value::Output), + CTP::Instance(_definition_id, _num_embedded) => { + todo!("implement full type checking on user-supplied arguments"); + return false; + }, + } + } } // TODO: @temp Should just become a concrete thing that is passed in diff --git a/src/protocol/parser/pass_typing.rs b/src/protocol/parser/pass_typing.rs index 46b17c2d98ac567766dd264cc04bf251aa068be1..0b858460b6a7c1ad2e9bf19867c760b3a56c63fd 100644 --- a/src/protocol/parser/pass_typing.rs +++ b/src/protocol/parser/pass_typing.rs @@ -1550,14 +1550,31 @@ impl PassTyping { // Every expression checked, and new monomorphs are queued. Transfer the // expression information to the type table. - let definition_id = match &self.definition_type { - DefinitionType::Component(id) => id.upcast(), - DefinitionType::Function(id) => id.upcast(), + let (definition_id, procedure_arguments) = match &self.definition_type { + DefinitionType::Component(id) => { + let definition = &ctx.heap[*id]; + (id.upcast(), &definition.parameters) + }, + DefinitionType::Function(id) => { + let definition = &ctx.heap[*id]; + (id.upcast(), &definition.parameters) + }, }; let target = ctx.types.get_procedure_expression_data_mut(&definition_id, self.reserved_idx); - debug_assert!(target.expr_data.is_empty()); // makes sure we never queue something twice + debug_assert!(target.arg_types.is_empty()); // makes sure we never queue a procedure's type inferencing twice + debug_assert!(target.expr_data.is_empty()); + + // - Write the arguments to the procedure + target.arg_types.reserve(procedure_arguments.len()); + for argument_id in procedure_arguments { + let mut concrete = ConcreteType::default(); + let argument_type = self.var_types.get(argument_id).unwrap(); + argument_type.var_type.write_concrete_type(&mut concrete); + target.arg_types.push(concrete); + } + // - Write the expression data target.expr_data.reserve(self.expr_types.len()); for infer_expr in self.expr_types.iter() { let mut concrete = ConcreteType::default(); diff --git a/src/protocol/parser/type_table.rs b/src/protocol/parser/type_table.rs index 6acf92b60bc3cebb569418508f605236a7255dde..c0ba521348fed41e6ca2996099bcb86f6128b16a 100644 --- a/src/protocol/parser/type_table.rs +++ b/src/protocol/parser/type_table.rs @@ -320,6 +320,7 @@ pub struct PolymorphicVariable { pub struct ProcedureMonomorph { // Expression data for one particular monomorph pub concrete_type: ConcreteType, + pub arg_types: Vec, pub expr_data: Vec, } @@ -647,6 +648,7 @@ impl TypeTable { let mono_idx = mono_types.len(); mono_types.push(ProcedureMonomorph{ concrete_type, + arg_types: Vec::new(), expr_data: Vec::new(), }); diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 3f75f4875bc9236146b9d7e95b5b23d8769bf670..b6c1c4b3d0221c6223fe21e9be93bfde25cd7a9e 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; +use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{Inbox, OutboxMessage}; +use crate::runtime2::inbox::{PrivateInbox, PublicInbox, OutgoingMessage}; use crate::runtime2::port::PortIdLocal; /// Represents the identifier of a branch (the index within its container). An @@ -51,14 +52,14 @@ pub(crate) struct Branch { sync_state: SpeculativeState, next_branch_in_queue: Option, // Message/port state - inbox: HashMap, // TODO: @temporary, remove together with fires() + inbox: HashMap, // TODO: @temporary, remove together with fires() ports_delta: Vec, } impl Branch { /// Constructs a non-sync branch. It is assumed that the code is at the /// first instruction - fn new_initial_branch(component_state: ComponentState) -> Self { + pub(crate) fn new_initial_branch(component_state: ComponentState) -> Self { Branch{ index: BranchId::new_invalid(), parent_index: BranchId::new_invalid(), @@ -135,13 +136,13 @@ enum PortOwnershipError { /// As the name implies, this contains a description of the ports associated /// with a connector. /// TODO: Extend documentation -struct ConnectorPorts { +pub(crate) struct ConnectorPorts { // Essentially a mapping from `port_index` to `port_id`. - owned_ports: Vec, + pub owned_ports: Vec, // Contains P*B entries, where P is the number of ports and B is the number // of branches. One can find the appropriate mapping of port p at branch b // at linear index `b*P+p`. - port_mapping: Vec + pub port_mapping: Vec } impl ConnectorPorts { @@ -246,22 +247,23 @@ impl BranchQueue { } /// Public fields of the connector that can be freely shared between multiple -/// threads. Note that this is not enforced by the compiler. The global store -/// allows retrieving the entire `Connector` as a mutable reference by one -/// thread, and this `ConnectorPublic` by any number of threads. +/// threads. pub(crate) struct ConnectorPublic { - pub inbox: Inbox, + pub inbox: PublicInbox, + pub sleeping: AtomicBool, } impl ConnectorPublic { pub fn new() -> Self { ConnectorPublic{ - inbox: Inbox::new(), + inbox: PublicInbox::new(), + sleeping: AtomicBool::new(false), } } } // TODO: Maybe prevent false sharing by aligning `public` to next cache line. +// TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct Connector { // State and properties of connector itself id: u32, @@ -272,8 +274,8 @@ pub(crate) struct Connector { sync_pending_get: BranchQueue, sync_finished: BranchQueue, // Port/message management + pub inbox: PrivateInbox, pub ports: ConnectorPorts, - pub public: ConnectorPublic, } struct TempCtx {} @@ -307,8 +309,8 @@ impl Connector { sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), + inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), - public: ConnectorPublic::new(), } } @@ -471,7 +473,7 @@ impl Connector { // Put in run results for thread to pick up and transfer to // the correct connector inbox. port_mapping.mark_definitive(branch.index, 1); - let message = OutboxMessage { + let message = OutgoingMessage { sending_port: local_port_id, sender_prev_branch_id: BranchId::new_invalid(), sender_cur_branch_id: branch.index, @@ -709,7 +711,7 @@ impl Connector { pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. - pub outbox: Vec, + pub outbox: Vec, pub new_connectors: Vec, // Workspaces pub ports: Vec, @@ -736,7 +738,7 @@ pub(crate) enum ConnectorScheduling { /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. -fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index 9b75ae2a4536113990a55afca44b831331b5e84f..96a30954aeedf71e9f3684d89b0cef2550aff4b1 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -2,13 +2,53 @@ use crate::collections::{MpmcQueue, RawVec}; use super::connector::{Connector, ConnectorPublic}; use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; +use super::inbox::PublicInbox; +use super::scheduler::Router; use std::ptr; -use std::sync::{RwLock, RwLockReadGuard}; +use std::sync::{Barrier, RwLock, RwLockReadGuard}; +use std::sync::atomic::AtomicBool; + +/// A kind of token that, once obtained, allows mutable access to a connector. +/// We're trying to use move semantics as much as possible: the owner of this +/// key is the only one that may execute the connector's code. +pub(crate) struct ConnectorKey { + pub index: u32, // of connector +} + +impl ConnectorKey { + /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable + /// access, to a "regular ID" which can be used to obtain immutable access. + #[inline] + pub fn downcast(&self) -> ConnectorId { + return ConnectorId(self.index); + } + + /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it + /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system + #[inline] + pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { + return ConnectorKey{ index: id.0 }; + } +} -/// A kind of token that, once obtained, allows access to a container. -struct ConnectorKey { - index: u32, // of connector +/// A kind of token that allows shared access to a connector. Multiple threads +/// may hold this +#[derive(Copy, Clone)] +pub(crate) struct ConnectorId(u32); + +impl ConnectorId { + // TODO: Like the other `new_invalid`, maybe remove + #[inline] + pub fn new_invalid() -> ConnectorId { + return ConnectorId(u32::MAX); + } +} + +pub struct ScheduledConnector { + pub connector: Connector, + pub public: ConnectorPublic, + pub router: Router } /// The registry containing all connectors. The idea here is that when someone @@ -21,7 +61,7 @@ struct ConnectorStore { } struct ConnectorStoreInner { - connectors: RawVec<*mut Connector>, + connectors: RawVec<*mut ScheduledConnector>, free: Vec, } @@ -36,11 +76,11 @@ impl ConnectorStore { } /// Retrieves the shared members of the connector. - pub(crate) fn get_shared(&self, connector_id: u32) -> &'static ConnectorPublic { + pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { let lock = self.inner.read().unwrap(); unsafe { - let connector = lock.connectors.get(connector_id as usize); + let connector = lock.connectors.get(connector_id.0 as usize); debug_assert!(!connector.is_null()); return &*connector.public; } @@ -48,7 +88,7 @@ impl ConnectorStore { /// Retrieves a particular connector. Only the thread that pulled the /// associated key out of the execution queue should (be able to) call this. - pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector { + pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { let lock = self.inner.read().unwrap(); unsafe { @@ -62,6 +102,11 @@ impl ConnectorStore { /// and/or queue it. pub(crate) fn create(&self, connector: Connector) -> ConnectorKey { let lock = self.inner.write().unwrap(); + let connector = ScheduledConnector{ + connector, + public: ConnectorPublic::new(), + router: Router::new(), + }; let index; if lock.free.is_empty() { @@ -144,14 +189,17 @@ impl PortStore { } } - pub(crate) fn create_channel(&self, creating_connector: Option) -> Channel { + pub(crate) fn create_channel(&self, creating_connector: Option) -> Channel { let mut lock = self.inner.write().unwrap(); // Reserves a new port. Doesn't point it to its counterpart - fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option) -> u32 { + fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option) -> u32 { let index; - let ownership = if creating_connector.is_some() { PortOwnership::Owned } else { PortOwnership::Unowned }; - let connector_id = creating_connector.unwrap_or(0); + let (ownership, connector_id) = if creating_connector.is_some() { + (PortOwnership::Owned, creating_connector.unwrap()) + } else { + (PortOwnership::Unowned, ConnectorId::new_invalid()) + }; if lock.free.is_empty() { index = lock.ports.len() as u32; @@ -237,10 +285,12 @@ impl Drop for PortStore { /// /// TODO: @docs /// TODO: @Optimize, very lazy implementation of concurrent datastructures. +/// This includes the `should_exit` and `did_exit` pair! pub struct GlobalStore { pub connector_queue: MpmcQueue, pub connectors: ConnectorStore, pub ports: PortStore, + pub should_exit: AtomicBool, // signal threads to exit } impl GlobalStore { @@ -249,6 +299,7 @@ impl GlobalStore { connector_queue: MpmcQueue::with_capacity(256), connectors: ConnectorStore::with_capacity(256), ports: PortStore::with_capacity(256), + should_exit: AtomicBool::new(false), } } } \ No newline at end of file diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index d2d89a42df942e5275d196c058fc5b80cc2ddc00..2d64dd23404f8dea01d3441b5adc99902eaba789 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -1,23 +1,40 @@ +/** +inbox.rs + +Contains various types of inboxes and message types for the connectors. There +are two kinds of inboxes: + +The `PublicInbox` is a simple message queue. Messages are put in by various +threads, and they're taken out by a single thread. These messages may contain +control messages and may be filtered or redirected by the scheduler. + +The `PrivateInbox` is a temporary storage for all messages that are received +within a certain sync-round. +**/ + use std::collections::VecDeque; use std::sync::{RwLock, RwLockReadGuard, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use crate::protocol::eval::ValueGroup; -use crate::runtime2::connector::{BranchId, PortIdLocal}; +use super::connector::{BranchId, PortIdLocal}; +use super::global_store::ConnectorId; /// A message prepared by a connector. Waiting to be picked up by the runtime to /// be sent to another connector. #[derive(Clone)] -pub struct OutboxMessage { +pub struct OutgoingMessage { pub sending_port: PortIdLocal, pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id pub sender_cur_branch_id: BranchId, // always valid pub message: ValueGroup, } -/// A message inserted into the inbox of a connector by the runtime. +/// A message that has been delivered (after being imbued with the receiving +/// port by the scheduler) to a connector. #[derive(Clone)] -pub struct InboxMessage { +pub struct DataMessage { + pub sending_connector: ConnectorId, pub sending_port: PortIdLocal, pub receiving_port: PortIdLocal, pub sender_prev_branch_id: BranchId, @@ -25,48 +42,80 @@ pub struct InboxMessage { pub message: ValueGroup, } -/// A message sent between connectors to communicate something about their -/// scheduling state. -pub enum ControlMessage { - ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID) - Ack(u32), // (control message ID) +/// A control message. These might be sent by the scheduler to notify eachother +/// of asynchronous state changes. +pub struct ControlMessage { + pub id: u32, // generic identifier, used to match request to response + pub sender: ConnectorId, + pub content: ControlMessageVariant, +} + +pub enum ControlMessageVariant { + ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port + Ack, // acknowledgement of previous control message, matching occurs through control message ID. +} + +/// Generic message in the `PublicInbox`, handled by the scheduler (which takes +/// out and handles all control message and potential routing). The correctly +/// addressed `Data` variants will end up at the connector. +pub enum Message { + Data(DataMessage), + Control(ControlMessage), +} + +/// The public inbox of a connector. The thread running the connector that owns +/// this inbox may retrieved from it. Non-owning threads may only put new +/// messages inside of it. +// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. +// Should behave as a MPSC queue. +pub struct PublicInbox { + messages: Mutex>, } -/// The inbox of a connector. The owning connector (i.e. the thread that is -/// executing the connector) should be able to read all messages. Other -/// connectors (potentially executed by different threads) should be able to -/// append messages. -/// -/// If a connector has no more code to run, and its inbox does not contain any -/// new messages, then it may go into sleep mode. -/// -// TODO: @Optimize, this is a temporary lazy implementation -pub struct Inbox { +impl PublicInbox { + pub fn new() -> Self { + Self{ + messages: Mutex::new(VecDeque::new()), + } + } + + pub fn insert_message(&self, message: Message) { + let mut lock = self.messages.lock().unwrap(); + lock.push_back(message); + } + + pub fn take_message(&self) -> Option { + let mut lock = self.messages.lock().unwrap(); + return lock.pop_front(); + } + + pub fn is_empty(&self) -> bool { + let lock = self.messages.lock().unwrap(); + return lock.is_empty(); + } +} + +pub struct PrivateInbox { // "Normal" messages, intended for a PDL protocol. These need to stick // around during an entire sync-block (to handle `put`s for which the // corresponding `get`s have not yet been reached). - messages: RwLock>, - len_read: AtomicUsize, - // System messages. These are handled by the scheduler and only need to be - // handled once. - system_messages: Mutex>, + messages: Vec, + len_read: usize, } -impl Inbox { +impl PrivateInbox { pub fn new() -> Self { Self{ - messages: RwLock::new(Vec::new()), - len_read: AtomicUsize::new(0), - system_messages: Mutex::new(VecDeque::new()), + messages: Vec::new(), + len_read: 0, } } /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. - pub fn insert_message(&self, message: InboxMessage) { - let mut messages = self.messages.write().unwrap(); - for existing in messages.iter() { + pub fn insert_message(&mut self, message: DataMessage) { + for existing in self.messages.iter() { if existing.sender_prev_branch_id == message.sender_prev_branch_id && existing.sender_cur_branch_id == message.sender_cur_branch_id && existing.receiving_port == message.receiving_port { @@ -74,7 +123,8 @@ impl Inbox { return; } } - messages.push(message); + + self.messages.push(message); } /// Retrieves all previously read messages that satisfy the provided @@ -86,11 +136,10 @@ impl Inbox { /// could be received by a newly encountered `get` call in a connector's /// PDL code. pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { - let lock = self.messages.read().unwrap(); return InboxMessageIter{ - lock, + messages: &self.messages, next_index: 0, - max_index: self.len_read.load(Ordering::Acquire), + max_index: self.len_read, match_port_id: port_id, match_prev_branch_id: prev_branch_id, }; @@ -98,58 +147,26 @@ impl Inbox { /// Retrieves the next unread message. Should only be called by the /// inbox-reader. - pub fn next_message(&self) -> Option { - let lock = self.messages.read().unwrap(); - let cur_index = self.len_read.load(Ordering::Acquire); - if cur_index >= lock.len() { + pub fn next_message(&mut self) -> Option<&DataMessage> { + if self.len_read == self.messages.len() { return None; } - // TODO: Accept the correctness and simply make it an add, or even - // remove the atomic altogether. - if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) { - panic!("multiple readers modifying number of messages read"); - } - - return Some(InboxMessageRef{ - lock, - index: cur_index, - }); + let to_return = &self.messages[self.len_read]; + self.len_read += 1; + return Some(to_return); } /// Simply empties the inbox pub fn clear(&mut self) { self.messages.clear(); - } - - pub fn insert_control_message(&self, message: ControlMessage) { - let mut lock = self.system_messages.lock().unwrap(); - lock.push_back(message); - } - - pub fn take_control_message(&self) -> Option { - let mut lock = self.system_messages.lock().unwrap(); - return lock.pop_front(); - } -} - -/// Reference to a new message -pub struct InboxMessageRef<'i> { - lock: RwLockReadGuard<'i, Vec>, - index: usize, -} - -impl<'i> std::ops::Deref for InboxMessageRef<'i> { - type Target = InboxMessage; - - fn deref(&self) -> &'i Self::Target { - return &self.lock[self.index]; + self.len_read = 0; } } /// Iterator over previously received messages in the inbox. pub struct InboxMessageIter<'i> { - lock: RwLockReadGuard<'i, Vec>, + messages: &'i Vec, next_index: usize, max_index: usize, match_port_id: PortIdLocal, @@ -157,12 +174,12 @@ pub struct InboxMessageIter<'i> { } impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> { - type Item = &'m InboxMessage; + type Item = &'m DataMessage; fn next(&'m mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { - let cur_message = &self.lock[self.next_index]; + let cur_message = &self.messages[self.next_index]; if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { // Found a match break; @@ -175,7 +192,7 @@ impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> { return None; } - let message = &self.lock[self.next_index]; + let message = &self.messages[self.next_index]; self.next_index += 1; return Some(message); } diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 296c8726b8d9714d8bac4dd0869912874ffd96a3..b75ec01ec78f0103f2fa427197c12a89cca226f0 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -1,9 +1,98 @@ +// Structure of module + mod runtime; mod messages; mod connector; mod port; mod global_store; mod scheduler; +mod inbox; #[cfg(test)] mod tests; -mod inbox; + +// Imports + +use std::sync::Arc; +use std::thread::{self, JoinHandle}; + +use crate::protocol::eval::*; +use crate::{common::Id, PortId, ProtocolDescription}; + +use global_store::GlobalStore; +use scheduler::Scheduler; +use crate::protocol::ComponentCreationError; +use crate::runtime2::connector::{Branch, Connector, find_ports_in_value_group}; + + +// Runtime API +pub struct Runtime { + global_store: Arc, + protocol_description: Arc, + schedulers: Vec> +} + +impl Runtime { + pub fn new(num_threads: usize, protocol_description: Arc) -> Runtime { + // Setup global state + assert!(num_threads > 0, "need a thread to run connectors"); + let global_store = Arc::new(GlobalStore::new()); + + // Launch threads + let mut schedulers = Vec::with_capacity(num_threads); + for _ in 0..num_threads { + let mut scheduler = Scheduler::new(global_store.clone(), protocol_description.clone()); + let thread = thread::spawn(move || { + scheduler.run(); + }); + + schedulers.push(thread); + } + + // Move innards into runtime struct + return Runtime{ + global_store, + protocol_description, + schedulers, + } + } + + /// Returns (putter port, getter port) + pub fn create_channel(&self) -> (Value, Value) { + let channel = self.global_store.ports.create_channel(None); + let putter_value = Value::Output(PortId(Id{ + connector_id: u32::MAX, + u32_suffix: channel.putter_id, + })); + let getter_value = Value::Input(PortId(Id{ + connector_id: u32::MAX, + u32_suffix: channel.getter_id, + })); + return (putter_value, getter_value); + } + + pub fn create_connector(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), ComponentCreationError> { + // TODO: Remove component creation function from PD, should not be concerned with it + // Create the connector and mark the ports as now owned by the + // connector + let mut port_ids = Vec::new(); + find_ports_in_value_group(&values, &mut port_ids); + + let component_state = self.protocol_description.new_component_v2(module.as_bytes(), procedure.as_bytes(), values)?; + let connector = Connector::new(0, Branch::new_initial_branch(component_state), port_ids.clone()); + let connector_key = self.global_store.connectors.create(connector); + + for port_id in port_ids { + let port = self.global_store.ports.get(&connector_key, port_id); + port.owning_connector = connector_key.downcast(); + port.peer_connector + // TODO: Note that we immediately need to notify the other side of the connector that + // the port has moved! + } + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + + } +} \ No newline at end of file diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index df7a7d64c3d949f8f3bbd24aa610e51b9faa170b..f58c6196cffb794ad70cf508abcd8c0b099de85c 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -1,3 +1,5 @@ +use super::global_store::ConnectorId; + #[derive(Clone, Copy, PartialEq, Eq)] pub(crate) struct PortIdLocal { pub index: u32, @@ -40,8 +42,8 @@ pub struct Port { pub kind: PortKind, // But this can be changed, but only by the connector that owns it pub ownership: PortOwnership, - pub owning_connector: u32, - pub peer_connector: u32, // might be temporarily inconsistent while peer port is sent around in non-sync phase. + pub owning_connector: ConnectorId, + pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase. } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 12899dc5c6bb601ef88d144ca083df520971ada3..dd5288e2399b338563d4c1130aa3fbe5f9c05ee5 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,13 +1,17 @@ use std::sync::Arc; +use std::sync::Condvar; +use std::sync::atomic::Ordering; use std::time::Duration; use std::thread; + use crate::ProtocolDescription; -use super::inbox::InboxMessage; -use super::connector::{Connector, ConnectorScheduling, RunDeltaState}; -use super::global_store::GlobalStore; +use super::port::{PortIdLocal}; +use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant}; +use super::connector::{Connector, ConnectorPublic, ConnectorScheduling, RunDeltaState}; +use super::global_store::{ConnectorKey, ConnectorId, GlobalStore}; -struct Scheduler { +pub(crate) struct Scheduler { global: Arc, code: Arc, } @@ -23,82 +27,268 @@ impl Scheduler { pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run - // TODO: @Memory, scheme for reducing allocations if excessive. let mut delta_state = RunDeltaState::new(); - loop { - // TODO: Check if we're supposed to exit - + 'thread_loop: loop { // Retrieve a unit of work let connector_key = self.global.connector_queue.pop_front(); if connector_key.is_none() { - // TODO: @Performance, needs condition variable for waking up + // TODO: @Performance, needs condition or something, and most + // def' not sleeping thread::sleep(Duration::new(1, 0)); - continue + if self.global.should_exit.load(Ordering::Acquire) { + // Thread exits! + break 'thread_loop; + } + + continue 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); - let connector = self.global.connectors.get_mut(&connector_key); + let scheduled = self.global.connectors.get_mut(&connector_key); + // Keep running until we should no longer immediately schedule the + // connector. let mut cur_schedule = ConnectorScheduling::Immediate; - while cur_schedule == ConnectorScheduling::Immediate { - let new_schedule; + // Check all the message that are in the shared inbox + while let Some(message) = scheduled.public.inbox.take_message() { + match message { + Message::Data(message) => { + // Check if we need to reroute, or can just put it + // in the private inbox of the connector + if let Some(other_connector_id) = scheduled.router.should_reroute(&message) { + self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message)); + } else { + scheduled.connector.inbox.insert_message(message); + } + }, + Message::Control(message) => { + match message.content { + ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { + // Need to change port target + let port = self.global.ports.get(&connector_key, port_id); + port.peer_connector = new_target_connector_id; + debug_assert!(delta_state.outbox.is_empty()); - // TODO: Check inbox for new message + // And respond with an Ack + self.send_message_and_wake_up_if_sleeping( + message.sender, + Message::Control(ControlMessage{ + id: message.id, + sender: connector_key.downcast(), + content: ControlMessageVariant::Ack, + }) + ); + }, + ControlMessageVariant::Ack => { + scheduled.router.handle_ack(message.id); + } + } + } + } + } - if connector.is_in_sync_mode() { + // Actually run the connector + let new_schedule; + if scheduled.connector.is_in_sync_mode() { // In synchronous mode, so we can expect messages being sent, // but we never expect the creation of connectors - new_schedule = connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); + new_schedule = scheduled.connector.run_in_speculative_mode(self.code.as_ref(), &mut delta_state); debug_assert!(delta_state.new_connectors.is_empty()); - - if !delta_state.outbox.is_empty() { - // There are message to send - for message in delta_state.outbox.drain(..) { - let (inbox_message, target_connector_id) = { - // Note: retrieving a port incurs a read lock - let sending_port = self.global.ports.get(&connector_key, message.sending_port); - ( - InboxMessage { - sending_port: sending_port.self_id, - receiving_port: sending_port.peer_id, - sender_prev_branch_id: message.sender_prev_branch_id, - sender_cur_branch_id: message.sender_cur_branch_id, - message: message.message, - }, - sending_port.peer_connector, - ) - }; - - let target_connector = self.global.connectors.get_shared(target_connector_id); - target_connector.inbox.insert_message(inbox_message); - - // TODO: Check silent state. Queue connector if it was silent - } - } } else { // In regular running mode (not in a sync block) we cannot send // messages but we can create new connectors - new_schedule = connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state); + new_schedule = scheduled.connector.run_in_deterministic_mode(self.code.as_ref(), &mut delta_state); debug_assert!(delta_state.outbox.is_empty()); + } - if !delta_state.new_connectors.is_empty() { - // Push all connectors into the global state and queue them - // for execution - for connector in delta_state.new_connectors.drain(..) { - // Create connector, modify all of the ports that - // it now owns, then queue it for execution - let connector_key = self.global.connectors.create(connector); - + // Handle all of the output from the current run: messages to + // send and connectors to instantiate. + self.handle_delta_state(&connector_key, &mut delta_state); + + cur_schedule = new_schedule; + } + + // If here then the connector does not require immediate execution. + // So enqueue it if requested, and otherwise put it in a sleeping + // state. + match cur_schedule { + ConnectorScheduling::Immediate => unreachable!(), + ConnectorScheduling::Later => { + // Simply queue it again later + self.global.connector_queue.push_back(connector_key); + }, + ConnectorScheduling::NotNow => { + // Need to sleep, note that we are the only ones which are + // allows to set the sleeping state to `true`, and since + // we're running it must currently be `false`. + debug_assert_eq!(scheduled.public.sleeping.load(Ordering::Acquire), false); + scheduled.public.sleeping.store(true, Ordering::Release); + + // We might have received a message in the meantime from a + // thread that did not see the sleeping flag set to `true`, + // so: + if !scheduled.public.inbox.is_empty() { + let should_reschedule_self = scheduled.public.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_reschedule_self { self.global.connector_queue.push_back(connector_key); } } } + } + } + } - cur_schedule = new_schedule; + fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) { + // Handling any messages that were sent + if !delta_state.outbox.is_empty() { + for message in delta_state.outbox.drain(..) { + let (inbox_message, target_connector_id) = { + let sending_port = self.global.ports.get(&connector_key, message.sending_port); + ( + DataMessage { + sending_connector: connector_key.downcast(), + sending_port: sending_port.self_id, + receiving_port: sending_port.peer_id, + sender_prev_branch_id: message.sender_prev_branch_id, + sender_cur_branch_id: message.sender_cur_branch_id, + message: message.message, + }, + sending_port.peer_connector, + ) + }; + + self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message)); + } + } + + // Handling any new connectors that were scheduled + // TODO: Pool outgoing messages to reduce atomic access + if !delta_state.new_connectors.is_empty() { + let cur_connector = self.global.connectors.get_mut(connector_key); + + for new_connector in delta_state.new_connectors.drain(..) { + // Add to global registry to obtain key + let new_key = self.global.connectors.create(new_connector); + let new_connector = self.global.connectors.get_mut(&new_key); + + // Each port should be lost by the connector that created the + // new one. Note that the creator is the current owner. + for port_id in &new_connector.ports.owned_ports { + debug_assert!(!cur_connector.ports.owned_ports.contains(port_id)); + + // Modify ownership, retrieve peer connector + let (peer_connector_id, peer_port_id) = { + let mut port = self.global.ports.get(connector_key, *port_id); + port.owning_connector = new_key.downcast(); + + (port.peer_connector, port.peer_id) + }; + + // Send message that port has changed ownership + let reroute_message = cur_connector.router.prepare_reroute( + port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast() + ); + + self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message); + } + + // Schedule new connector to run + self.global.connector_queue.push_back(new_key); } } } + + pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { + let connector = self.global.connectors.get_shared(connector_id); + + connector.inbox.insert_message(message); + let should_wake_up = connector.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe { ConnectorKey::from_id(connector_id) }; + self.global.connector_queue.push_back(key); + } + } +} + +/// Represents a rerouting entry due to a moved port +// TODO: Optimize +struct ReroutedTraffic { + id: u32, // ID of control message + port: PortIdLocal, // targeted port + source_connector: ConnectorId, // connector we expect messages from + target_connector: ConnectorId, // connector they should be rerouted to +} + +pub(crate) struct Router { + id_counter: u32, + active: Vec, +} + +impl Router { + pub fn new() -> Self { + Router{ + id_counter: 0, + active: Vec::new(), + } + } + + /// Prepares rerouting messages due to changed ownership of a port. The + /// control message returned by this function must be sent to the + /// transferred port's peer connector. + pub fn prepare_reroute( + &mut self, + port_id: PortIdLocal, peer_port_id: PortIdLocal, + self_connector_id: ConnectorId, peer_connector_id: ConnectorId, + new_owner_connector_id: ConnectorId + ) -> Message { + let id = self.id_counter; + self.id_counter.overflowing_add(1); + + self.active.push(ReroutedTraffic{ + id, + port: port_id, + source_connector: peer_connector_id, + target_connector: new_owner_connector_id, + }); + + return Message::Control(ControlMessage{ + id, + sender: self_connector_id, + content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id) + }); + } + + /// Returns true if the supplied message should be rerouted. If so then this + /// function returns the connector that should retrieve this message. + pub fn should_reroute(&self, message: &DataMessage) -> Option { + for reroute in &self.active { + if reroute.source_connector == message.sending_connector && + reroute.port == message.sending_port { + // Need to reroute this message + return Some(reroute.target_connector); + } + } + + return None; + } + + /// Handles an Ack as an answer to a previously sent control message + pub fn handle_ack(&mut self, id: u32) { + let index = self.active.iter() + .position(|v| v.id == id); + + match index { + Some(index) => { self.active.remove(index); }, + None => { todo!("handling of nefarious ACKs"); }, + } + } } \ No newline at end of file