diff --git a/src/collections/raw_vec.rs b/src/collections/raw_vec.rs index 89ca6c2ac19bfc99e5ec1dad7609213e0dd2df70..0abf43a06ac1d39f500751c698e40150ded015b4 100644 --- a/src/collections/raw_vec.rs +++ b/src/collections/raw_vec.rs @@ -34,7 +34,7 @@ impl RawVec { pub fn with_capacity(capacity: usize) -> Self { // Could be done a bit more efficiently let mut result = Self::new(); - result.ensure_space(capacity); + result.ensure_space(capacity).unwrap(); return result; } @@ -49,7 +49,7 @@ impl RawVec { } pub fn push(&mut self, item: T) { - self.ensure_space(1); + self.ensure_space(1).unwrap(); unsafe { let target = self.base.add(self.len); std::ptr::write(target, item); @@ -87,7 +87,7 @@ impl RawVec { dealloc(old_base, old_layout); } - self.base = new_base; + self.base = new_base as *mut T; self.cap = new_cap; } } // else: still enough space @@ -114,7 +114,7 @@ impl Drop for RawVec { debug_assert!(!self.base.is_null()); let (_, layout) = self.current_layout(); unsafe { - dealloc(self.base, layout); + dealloc(self.base as *mut u8, layout); if cfg!(debug_assertions) { self.base = ptr::null_mut(); } diff --git a/src/protocol/parser/type_table.rs b/src/protocol/parser/type_table.rs index c0ba521348fed41e6ca2996099bcb86f6128b16a..19721bb9ccb96028eb8f9d485923b417863b1995 100644 --- a/src/protocol/parser/type_table.rs +++ b/src/protocol/parser/type_table.rs @@ -1119,7 +1119,6 @@ impl TypeTable { let mut breadcrumb = self.type_loop_breadcrumbs[breadcrumb_idx].clone(); let poly_type = self.lookup.get(&breadcrumb.definition_id).unwrap(); - let poly_type_definition_id = poly_type.ast_definition; let resolve_result = match &poly_type.definition { DTV::Enum(_) => { diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 42546271a1f6f01dfc042f0e78615f0446a12171..47ddc84473aeb08641da02f3cb67ff2b56f388e2 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{MessageContents, SolutionMessage}; +use crate::runtime2::inbox::{Message, MessageContents, SolutionMessage}; use crate::runtime2::native::Connector; use crate::runtime2::port::{Port, PortKind}; use crate::runtime2::scheduler::ConnectorCtx; @@ -19,7 +19,7 @@ use super::port::PortIdLocal; /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not /// yet receive anything from any branch). #[derive(Clone, Copy, PartialEq, Eq)] -pub(crate) struct BranchId { +pub struct BranchId { pub index: u32, } @@ -145,6 +145,7 @@ struct PortOwnershipDelta { port_id: PortIdLocal, } +#[derive(Debug)] enum PortOwnershipError { UsedInInteraction(PortIdLocal), AlreadyGivenAway(PortIdLocal) @@ -158,7 +159,7 @@ pub(crate) struct ConnectorPorts { // Contains P*B entries, where P is the number of ports and B is the number // of branches. One can find the appropriate mapping of port p at branch b // at linear index `b*P+p`. - pub port_mapping: Vec + port_mapping: Vec } impl ConnectorPorts { @@ -188,7 +189,8 @@ impl ConnectorPorts { self.port_mapping.reserve(num_ports); for offset in 0..num_ports { let parent_port = &self.port_mapping[parent_base_idx + offset]; - self.port_mapping.push(parent_port.clone()); + let parent_port = parent_port.clone(); + self.port_mapping.push(parent_port); } } @@ -303,10 +305,10 @@ pub(crate) struct ConnectorPublic { } impl ConnectorPublic { - pub fn new() -> Self { + pub fn new(initialize_as_sleeping: bool) -> Self { ConnectorPublic{ inbox: PublicInbox::new(), - sleeping: AtomicBool::new(false), + sleeping: AtomicBool::new(initialize_as_sleeping), } } } @@ -349,14 +351,14 @@ impl RunContext for TempCtx { } impl Connector for ConnectorPDL { - fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { use MessageContents as MC; - match message { - MC::Data(message) => self.handle_data_message(message), - MC::Sync(message) => self.handle_sync_message(message, ctx, delta_state), - MC::RequestCommit(message) => self.handle_request_commit_message(message, ctx, delta_state), - MC::ConfirmCommit(message) => self.handle_confirm_commit_message(message, ctx, delta_state), + match message.contents { + MC::Data(content) => self.handle_data_message(message.receiving_port, content), + MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state), + MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state), + MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state), MC::Control(_) | MC::Ping => {}, } } @@ -446,8 +448,8 @@ impl ConnectorPDL { // ------------------------------------------------------------------------- #[inline] - pub fn handle_data_message(&mut self, message: DataMessage) { - self.inbox.insert_message(message); + pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) { + self.inbox.insert_message(target_port, message); } /// Accepts a synchronous message and combines it with the locally stored @@ -577,6 +579,7 @@ impl ConnectorPDL { // If here, then the newly generated solution is completely // compatible. + let next_branch = branch.next_branch_in_queue; self.submit_sync_solution(new_solution, ctx, results); // Consider the next branch @@ -585,8 +588,8 @@ impl ConnectorPDL { break; } - debug_assert!(branch.next_branch_in_queue.is_some()); // because we cannot be at the end of the queue - branch_index = branch.next_branch_in_queue.unwrap(); + debug_assert!(next_branch.is_some()); // because we cannot be at the end of the queue + branch_index = next_branch.unwrap(); } } } @@ -629,7 +632,7 @@ impl ConnectorPDL { } } - fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { // Make sure this is the message we actually committed to. As long as // we're running on a single machine this is fine. // TODO: Take care of nefarious peers @@ -683,7 +686,7 @@ impl ConnectorPDL { /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); debug_assert!(!self.sync_active.is_empty()); @@ -709,24 +712,35 @@ impl ConnectorPDL { let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); debug_assert!(self.ports.owned_ports.contains(&local_port_id)); - let silent_branch = &*branch; - // Create a copied branch who will have the port set to firing - let firing_index = self.branches.len() as u32; - let mut firing_branch = Branch::new_sync_branching_from(firing_index, silent_branch); - self.ports.prepare_sync_branch(branch.index.index, firing_index); + // Create two copied branches, one silent and one firing + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + let parent_branch_id = branch.index; + let parent_branch = &self.branches[parent_branch_id.index as usize]; - let firing_port = self.ports.get_port_mut(firing_index, local_port_index); - firing_port.mark_speculative(1); + let silent_index = self.branches.len() as u32; + let firing_index = silent_index + 1; + + let silent_branch = Branch::new_sync_branching_from(silent_index, parent_branch); + self.ports.prepare_sync_branch(parent_branch.index.index, silent_index); - // Assign the old branch a silent value - let silent_port = self.ports.get_port_mut(silent_branch.index.index, local_port_index); + let firing_branch = Branch::new_sync_branching_from(firing_index, parent_branch); + self.ports.prepare_sync_branch(parent_branch.index.index, firing_index); + + // Assign the port values of the two new branches + let silent_port = self.ports.get_port_mut(silent_index, local_port_index); silent_port.mark_speculative(0); + let firing_port = self.ports.get_port_mut(firing_index, local_port_index); + firing_port.mark_speculative(1); + // Run both branches again - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch.index); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch.index); + let silent_branch_id = silent_branch.index; + self.branches.push(silent_branch); + let firing_branch_id = firing_branch.index; self.branches.push(firing_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch_id); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch_id); return ConnectorScheduling::Immediate; }, @@ -755,7 +769,8 @@ impl ConnectorPDL { if is_valid_get { // Mark as a branching point for future messages branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch.index); + let branch_id = branch.index; + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id); // But if some messages can be immediately applied, do so // now. @@ -766,9 +781,10 @@ impl ConnectorPDL { did_have_messages = true; // For each message prepare a new branch to execute + let parent_branch = &self.branches[branch_id.index as usize]; let new_branch_index = self.branches.len() as u32; - let mut new_branch = Branch::new_sync_branching_from(new_branch_index, branch); - self.ports.prepare_sync_branch(branch.index.index, new_branch_index); + let mut new_branch = Branch::new_sync_branching_from(new_branch_index, parent_branch); + self.ports.prepare_sync_branch(branch_id.index, new_branch_index); let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); port_mapping.last_registered_branch_id = message.sender_cur_branch_id; @@ -785,8 +801,9 @@ impl ConnectorPDL { // Schedule the new branch debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch.index); + let new_branch_id = new_branch.index; self.branches.push(new_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id); } if did_have_messages { @@ -808,8 +825,9 @@ impl ConnectorPDL { } } + let branch_id = branch.index; branch.sync_state = SpeculativeState::ReachedSyncEnd; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch.index); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch_id); }, RunResult::BranchPut(port_id, value_group) => { // Branch performed a `put` on a particualar port. @@ -853,7 +871,7 @@ impl ConnectorPDL { // ownership over them in this branch debug_assert!(results.ports.is_empty()); find_ports_in_value_group(&message.message, &mut results.ports); - Self::release_ports_during_sync(&mut self.ports, branch, &results.ports); + Self::release_ports_during_sync(&mut self.ports, branch, &results.ports).unwrap(); results.ports.clear(); results.outbox.push(MessageContents::Data(message)); @@ -875,7 +893,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -897,8 +915,9 @@ impl ConnectorPDL { // Prepare for sync execution and reschedule immediately self.in_sync = true; let first_sync_branch = Branch::new_sync_branching_from(1, branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch.index); + let first_sync_branch_id = first_sync_branch.index; self.branches.push(first_sync_branch); + Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id); return ConnectorScheduling::Later; }, @@ -1020,6 +1039,8 @@ impl ConnectorPDL { } prev_index = next_index; + let entry = &branches[next_index as usize]; + next_index = entry.next_branch_in_queue.unwrap_or(0); } // If here, then we didn't find the element @@ -1216,7 +1237,7 @@ impl ConnectorPDL { // TODO: Maybe another package for random? let comparison_number: u64 = unsafe { let mut random_array = [0u8; 8]; - getrandom::getrandom(&mut random_array); + getrandom::getrandom(&mut random_array).unwrap(); std::mem::transmute(random_array) }; diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index 4e7b9f621ed03fe587d80c4186d72249417ca598..ded4be362c43d4da660cd1117837216957f33af9 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -3,88 +3,13 @@ use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, AtomicU32}; use crate::collections::{MpmcQueue, RawVec}; - -use super::connector::{ConnectorPDL, ConnectorPublic}; -use super::scheduler::Router; - use crate::ProtocolDescription; -use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState}; -use crate::runtime2::inbox::MessageContents; -use crate::runtime2::native::{Connector, ConnectorApplication}; -use crate::runtime2::scheduler::ConnectorCtx; - -/// A kind of token that, once obtained, allows mutable access to a connector. -/// We're trying to use move semantics as much as possible: the owner of this -/// key is the only one that may execute the connector's code. -pub(crate) struct ConnectorKey { - pub index: u32, // of connector -} - -impl ConnectorKey { - /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable - /// access, to a "regular ID" which can be used to obtain immutable access. - #[inline] - pub fn downcast(&self) -> ConnectorId { - return ConnectorId(self.index); - } - - /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it - /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system - #[inline] - pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { - return ConnectorKey{ index: id.0 }; - } -} -/// A kind of token that allows shared access to a connector. Multiple threads -/// may hold this -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub(crate) struct ConnectorId(pub u32); +use super::scheduler::{Router, ConnectorCtx}; +use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; +use super::inbox::Message; +use super::native::{Connector, ConnectorApplication}; -impl ConnectorId { - // TODO: Like the other `new_invalid`, maybe remove - #[inline] - pub fn new_invalid() -> ConnectorId { - return ConnectorId(u32::MAX); - } - - #[inline] - pub(crate) fn is_valid(&self) -> bool { - return self.0 != u32::MAX; - } -} - -// TODO: Change this, I hate this. But I also don't want to put `public` and -// `router` of `ScheduledConnector` back into `Connector`. The reason I don't -// want `Box` everywhere is because of the v-table overhead. But -// to truly design this properly I need some benchmarks. -pub enum ConnectorVariant { - UserDefined(ConnectorPDL), - Native(Box), -} - -impl Connector for ConnectorVariant { - fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { - match self { - ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state), - ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state), - } - } - - fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { - match self { - ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state), - ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state), - } - } -} - -pub struct ScheduledConnector { - pub connector: ConnectorVariant, // access by connector - pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector - pub public: ConnectorPublic, // accessible by all schedulers and connectors - pub router: Router, -} /// The registry containing all connectors. The idea here is that when someone /// owns a `ConnectorKey`, then one has unique access to that connector. @@ -138,16 +63,17 @@ impl ConnectorStore { pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey { // Connector interface does not own any initial ports, and cannot be // created by another connector - let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector))); + let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)), true); return key; } /// Create a new connector, returning the key that can be used to retrieve /// and/or queue it. The caller must make sure that the constructed /// connector's code is initialized with the same ports as the ports in the - /// `initial_ports` array. + /// `initial_ports` array. Furthermore the connector is initialized as not + /// sleeping, so MUST be put on the connector queue by the caller. pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey { - let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector)); + let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector), false); let new_connector = self.get_mut(&key); // Transferring ownership of ports (and crashing if there is a @@ -166,7 +92,7 @@ impl ConnectorStore { } pub(crate) fn destroy(&self, key: ConnectorKey) { - let lock = self.inner.write().unwrap(); + let mut lock = self.inner.write().unwrap(); unsafe { let connector = lock.connectors.get_mut(key.index as usize); @@ -178,26 +104,23 @@ impl ConnectorStore { } /// Creates a connector but does not set its initial ports - fn create_connector_raw(&self, connector: ConnectorVariant) -> ConnectorKey { + fn create_connector_raw(&self, connector: ConnectorVariant, initialize_as_sleeping: bool) -> ConnectorKey { // Creation of the connector in the global store, requires a lock let index; { - let lock = self.inner.write().unwrap(); + let mut lock = self.inner.write().unwrap(); let connector = ScheduledConnector { connector, context: ConnectorCtx::new(self.port_counter.clone()), - public: ConnectorPublic::new(), + public: ConnectorPublic::new(initialize_as_sleeping), router: Router::new(), }; if lock.free.is_empty() { let connector = Box::into_raw(Box::new(connector)); - unsafe { - // Cheating a bit here. Anyway, move to heap, store in list - index = lock.connectors.len(); - lock.connectors.push(connector); - } + index = lock.connectors.len(); + lock.connectors.push(connector); } else { index = lock.free.pop().unwrap(); @@ -239,14 +162,14 @@ impl Drop for ConnectorStore { /// TODO: @docs /// TODO: @Optimize, very lazy implementation of concurrent datastructures. /// This includes the `should_exit` and `did_exit` pair! -pub struct GlobalStore { +pub(crate) struct GlobalStore { pub connector_queue: MpmcQueue, pub connectors: ConnectorStore, pub should_exit: AtomicBool, // signal threads to exit } impl GlobalStore { - pub fn new() -> Self { + pub(crate) fn new() -> Self { Self{ connector_queue: MpmcQueue::with_capacity(256), connectors: ConnectorStore::with_capacity(256), diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index b07d021190240a8c06e000f0bc4398e19bddb273..b14bbafdc12a4c6d71d73634fd83bd1bcecad9f9 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -238,11 +238,11 @@ impl PublicInbox { } } -pub struct PrivateInbox { +pub(crate) struct PrivateInbox { // "Normal" messages, intended for a PDL protocol. These need to stick // around during an entire sync-block (to handle `put`s for which the // corresponding `get`s have not yet been reached). - messages: Vec, + messages: Vec<(PortIdLocal, DataMessage)>, len_read: usize, } @@ -257,17 +257,17 @@ impl PrivateInbox { /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. - pub fn insert_message(&mut self, message: DataMessage) { - for existing in self.messages.iter() { + pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, message: DataMessage) { + for (existing_target_port, existing) in self.messages.iter() { if existing.sender_prev_branch_id == message.sender_prev_branch_id && existing.sender_cur_branch_id == message.sender_cur_branch_id && - existing.sending_port == message.sending_port { + *existing_target_port == target_port { // Message was already received return; } } - self.messages.push(message); + self.messages.push((target_port, message)); } /// Retrieves all previously read messages that satisfy the provided @@ -278,7 +278,7 @@ impl PrivateInbox { /// This function should only be used to check if already-received messages /// could be received by a newly encountered `get` call in a connector's /// PDL code. - pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { + pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { return InboxMessageIter{ messages: &self.messages, next_index: 0, @@ -290,26 +290,26 @@ impl PrivateInbox { /// Retrieves the next unread message. Should only be called by the /// inbox-reader. - pub fn next_message(&mut self) -> Option<&DataMessage> { + pub(crate) fn next_message(&mut self) -> Option<&DataMessage> { if self.len_read == self.messages.len() { return None; } - let to_return = &self.messages[self.len_read]; + let (_, to_return) = &self.messages[self.len_read]; self.len_read += 1; return Some(to_return); } /// Simply empties the inbox - pub fn clear(&mut self) { + pub(crate) fn clear(&mut self) { self.messages.clear(); self.len_read = 0; } } /// Iterator over previously received messages in the inbox. -pub struct InboxMessageIter<'i> { - messages: &'i Vec, +pub(crate) struct InboxMessageIter<'i> { + messages: &'i Vec<(PortIdLocal, DataMessage)>, next_index: usize, max_index: usize, match_port_id: PortIdLocal, @@ -322,8 +322,8 @@ impl<'i> Iterator for InboxMessageIter<'i> { fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { - let cur_message = &self.messages[self.next_index]; - if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { + let (target_port, cur_message) = &self.messages[self.next_index]; + if *target_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { // Found a match break; } @@ -335,7 +335,7 @@ impl<'i> Iterator for InboxMessageIter<'i> { return None; } - let message = &self.messages[self.next_index]; + let (_, message) = &self.messages[self.next_index]; self.next_index += 1; return Some(message); } diff --git a/src/runtime2/messages.rs b/src/runtime2/messages.rs index 8949314f4d03424c8098df0893a138866003ceb9..e0273ef5ad58790624546ec061fce56770775e6a 100644 --- a/src/runtime2/messages.rs +++ b/src/runtime2/messages.rs @@ -2,7 +2,6 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use crate::PortId; -use crate::protocol::*; use crate::protocol::eval::*; /// A message residing in a connector's inbox (waiting to be put into some kind diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index e199ab5e454d13ea8067111623dfb14e30526cf1..b1acb13a1fd227c022ce0788916b0ed1b7663c23 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -14,7 +14,7 @@ mod inbox; // Imports use std::sync::{Arc, Mutex}; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicU32, Ordering}; use std::thread::{self, JoinHandle}; use crate::ProtocolDescription; @@ -24,19 +24,113 @@ use scheduler::Scheduler; use native::{ConnectorApplication, ApplicationInterface}; -// Runtime API -// TODO: Exit condition is very dirty. Take into account: -// - Connector hack with &'static references. May only destroy (unforced) if all connectors are done working -// - Running schedulers: schedulers need to be signaled that they should exit, then wait until all are done -// - User-owned interfaces: As long as these are owned user may still decide to create new connectors. +/// A kind of token that, once obtained, allows mutable access to a connector. +/// We're trying to use move semantics as much as possible: the owner of this +/// key is the only one that may execute the connector's code. +pub(crate) struct ConnectorKey { + pub index: u32, // of connector +} + +impl ConnectorKey { + /// Downcasts the `ConnectorKey` type, which can be used to obtain mutable + /// access, to a "regular ID" which can be used to obtain immutable access. + #[inline] + pub fn downcast(&self) -> ConnectorId { + return ConnectorId(self.index); + } + + /// Turns the `ConnectorId` into a `ConnectorKey`, marked as unsafe as it + /// bypasses the type-enforced `ConnectorKey`/`ConnectorId` system + #[inline] + pub unsafe fn from_id(id: ConnectorId) -> ConnectorKey { + return ConnectorKey{ index: id.0 }; + } +} + +/// A kind of token that allows shared access to a connector. Multiple threads +/// may hold this +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct ConnectorId(pub u32); + +impl ConnectorId { + // TODO: Like the other `new_invalid`, maybe remove + #[inline] + pub fn new_invalid() -> ConnectorId { + return ConnectorId(u32::MAX); + } + + #[inline] + pub(crate) fn is_valid(&self) -> bool { + return self.0 != u32::MAX; + } +} + +// TODO: Change this, I hate this. But I also don't want to put `public` and +// `router` of `ScheduledConnector` back into `Connector`. The reason I don't +// want `Box` everywhere is because of the v-table overhead. But +// to truly design this properly I need some benchmarks. +pub(crate) enum ConnectorVariant { + UserDefined(ConnectorPDL), + Native(Box), +} + +impl Connector for ConnectorVariant { + fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + match self { + ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state), + ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state), + } + } + + fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + match self { + ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state), + ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state), + } + } +} + +pub(crate) struct ScheduledConnector { + pub connector: ConnectorVariant, // access by connector + pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector + pub public: ConnectorPublic, // accessible by all schedulers and connectors + pub router: Router, +} + +/// Externally facing runtime. pub struct Runtime { inner: Arc, } pub(crate) struct RuntimeInner { - pub(crate) global_store: GlobalStore, + // Protocol pub(crate) protocol_description: ProtocolDescription, - schedulers: Mutex>>, // TODO: Revise, make exit condition something like: all interfaces dropped + // Storage of connectors in a kind of freelist. Note the vector of points to + // ensure pointer stability: the vector can be changed but the entries + // themselves remain valid. + pub connectors_list: RawVec<*mut ScheduledConnector>, + pub connectors_free: Vec, + + pub(crate) global_store: GlobalStore, + schedulers: Mutex>>, + active_interfaces: AtomicU32, // active API interfaces that can add connectors/channels +} + +impl RuntimeInner { + #[inline] + pub(crate) fn increment_active_interfaces(&self) { + let _old_num = self.active_interfaces.fetch_add(1, Ordering::SeqCst); + debug_assert_ne!(_old_num, 1); // once it hits 0, it stays zero + } + + pub(crate) fn decrement_active_interfaces(&self) { + let old_num = self.active_interfaces.fetch_sub(1, Ordering::SeqCst); + debug_assert!(old_num > 0); + if old_num == 1 { + // Became 0 + // TODO: Check num connectors, if 0, then set exit flag + } + } } // TODO: Come back to this at some point @@ -44,24 +138,28 @@ unsafe impl Send for RuntimeInner {} unsafe impl Sync for RuntimeInner {} impl Runtime { - pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime { + pub fn new(num_threads: u32, protocol_description: ProtocolDescription) -> Runtime { // Setup global state assert!(num_threads > 0, "need a thread to run connectors"); let runtime_inner = Arc::new(RuntimeInner{ global_store: GlobalStore::new(), protocol_description, schedulers: Mutex::new(Vec::new()), + active_interfaces: AtomicU32::new(1), // we are the active interface }); // Launch threads { - let mut schedulers = Vec::with_capacity(num_threads); - for _ in 0..num_threads { + let mut schedulers = Vec::with_capacity(num_threads as usize); + for thread_index in 0..num_threads { let cloned_runtime_inner = runtime_inner.clone(); - let thread = thread::spawn(move || { - let mut scheduler = Scheduler::new(cloned_runtime_inner); - scheduler.run(); - }); + let thread = thread::Builder::new() + .name(format!("thread-{}", thread_index)) + .spawn(move || { + let mut scheduler = Scheduler::new(cloned_runtime_inner, thread_index); + scheduler.run(); + }) + .unwrap(); schedulers.push(thread); } @@ -92,7 +190,7 @@ impl Drop for Runtime { self.inner.global_store.should_exit.store(true, Ordering::Release); let mut schedulers = self.inner.schedulers.lock().unwrap(); for scheduler in schedulers.drain(..) { - scheduler.join(); + scheduler.join().unwrap(); } } } \ No newline at end of file diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index a436d529b9cbca6154208bfe01de9df3dcbd857b..39885d6565633073652f17f267168f76323ea3a6 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -1,7 +1,8 @@ +use std::collections::VecDeque; use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::Ordering; -use crate::protocol::ComponentCreationError; +use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use crate::runtime2::connector::{Branch, find_ports_in_value_group}; @@ -17,18 +18,18 @@ use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; use super::inbox::Message; /// Generic connector interface from the scheduler's point of view. -pub trait Connector { +pub(crate) trait Connector { /// Handle a new message (preprocessed by the scheduler). You probably only /// want to handle `Data`, `Sync`, and `Solution` messages. The others are /// intended for the scheduler itself. - fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); + fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); /// Should run the connector's behaviour up until the next blocking point. fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; -type JobQueue = Arc>>; +type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), @@ -45,7 +46,7 @@ pub struct ConnectorApplication { impl ConnectorApplication { pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); - let job_queue = Arc::new(Mutex::new(Vec::with_capacity(32))); + let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone() }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); @@ -55,20 +56,31 @@ impl ConnectorApplication { } impl Connector for ConnectorApplication { - fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { - todo!("handling messages in ConnectorApplication (API for runtime)") + fn handle_message(&mut self, message: Message, _ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { + use MessageContents as MC; + + match message.contents { + MC::Data(_) => unreachable!("data message in API connector"), + MC::Sync(_) | MC::RequestCommit(_) | MC::ConfirmCommit(_) => { + // Handling sync in API + }, + MC::Control(_) => {}, + MC::Ping => {}, + } } fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); - while let Some(job) = queue.pop() { + while let Some(job) = queue.pop_front() { match job { ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { + println!("DEBUG: API adopting ports"); delta_state.new_ports.reserve(2); delta_state.new_ports.push(endpoint_a); delta_state.new_ports.push(endpoint_b); } ApplicationJob::NewConnector(connector) => { + println!("DEBUG: API creating connector"); delta_state.new_connectors.push(connector); } } @@ -89,7 +101,9 @@ pub struct ApplicationInterface { } impl ApplicationInterface { - pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { + fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { + runtime.active_interfaces += 1; + return Self{ sync_done, job_queue, runtime, connector_id: ConnectorId::new_invalid(), @@ -122,7 +136,7 @@ impl ApplicationInterface { { let mut lock = self.job_queue.lock().unwrap(); - lock.push(ApplicationJob::NewChannel((getter_port, putter_port))); + lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port))); } // Add to owned ports for error checking while creating a connector @@ -162,7 +176,7 @@ impl ApplicationInterface { // Put on job queue { let mut queue = self.job_queue.lock().unwrap(); - queue.push(ApplicationJob::NewConnector(connector)); + queue.push_back(ApplicationJob::NewConnector(connector)); } // Send ping message to wake up connector @@ -178,8 +192,11 @@ impl ApplicationInterface { .is_ok(); if should_wake_up { + println!("DEBUG: Waking up connector"); let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; self.runtime.global_store.connector_queue.push_back(key); + } else { + println!("DEBUG: NOT waking up connector"); } return Ok(()); @@ -196,11 +213,17 @@ impl ApplicationInterface { pub fn wait(&self) { let (is_done, condition) = &*self.sync_done; let lock = is_done.lock().unwrap(); - condition.wait_while(lock, |v| !*v); // wait while not done + condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done } /// Called by runtime to set associated connector's ID. pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { self.connector_id = id; } +} + +impl Drop for ApplicationInterface { + fn drop(&mut self) { + + } } \ No newline at end of file diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index 88d22e8574a1b103bd2b83c95bbf43a337d5bad0..e24e332c4c899afcd015e81898edb4e0ab224143 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -1,7 +1,7 @@ use super::global_store::ConnectorId; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub(crate) struct PortIdLocal { +pub struct PortIdLocal { pub index: u32, } diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 5e35dff82d1472beac90834a3438a579c455b7e5..ee19e6b5a17562f26a9d074195fb4bb18462b9d5 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1062,7 +1062,7 @@ struct Context<'a> { } impl<'a> crate::protocol::RunContext for Context<'a> { - fn did_put(&mut self, port: PortId) -> bool { + fn did_put(&mut self, _port: PortId) -> bool { // Note that we want "did put" to return false if we have fired zero // times, because this implies we did a prevous let old_value = self.branch_ctx.just_called_did_put; diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 8f2ac8ae030d9f193e325dd32274a788ef182fe0..fc11f600d1c486a1f99272569a0437e58c25fb67 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -88,6 +88,7 @@ impl ConnectorCtx { pub(crate) struct Scheduler { runtime: Arc, + scheduler_id: u32, } // Thinking aloud: actual ports should be accessible by connector, but managed @@ -95,32 +96,37 @@ pub(crate) struct Scheduler { // only context, instead of an extra call on the "Connector" trait. impl Scheduler { - pub fn new(runtime: Arc) -> Self { - return Self{ runtime }; + pub fn new(runtime: Arc, scheduler_id: u32) -> Self { + return Self{ runtime, scheduler_id }; } pub fn run(&mut self) { // Setup global storage and workspaces that are reused for every // connector that we run + let scheduler_id = self.scheduler_id; let mut delta_state = RunDeltaState::new(); 'thread_loop: loop { // Retrieve a unit of work - let connector_key = self.runtime.global_store.connector_queue.pop_front(); - if connector_key.is_none() { + let mut connector_key = self.runtime.global_store.connector_queue.pop_front(); + while connector_key.is_none() { // TODO: @Performance, needs condition or something, and most // def' not sleeping + println!("DEBUG [{}]: Nothing to do", scheduler_id); thread::sleep(Duration::new(1, 0)); if self.runtime.global_store.should_exit.load(Ordering::Acquire) { // Thread exits! + println!("DEBUG [{}]: ... So I am quitting", scheduler_id); break 'thread_loop; } + println!("DEBUG [{}]: ... But I'm still running", scheduler_id); continue 'thread_loop; } // We have something to do let connector_key = connector_key.unwrap(); + println!("DEBUG [{}]: Running connector {}", scheduler_id, connector_key.index); let scheduled = self.runtime.global_store.connectors.get_mut(&connector_key); // Keep running until we should no longer immediately schedule the @@ -168,7 +174,7 @@ impl Scheduler { } } else { // Let connector handle message - scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state); + scheduled.connector.handle_message(message, &scheduled.context, &mut delta_state); } } @@ -349,7 +355,8 @@ impl Router { new_owner_connector_id: ConnectorId ) -> Message { let id = self.id_counter; - self.id_counter.overflowing_add(1); + let (new_id_counter, _) = self.id_counter.overflowing_add(1); + self.id_counter = new_id_counter; self.active.push(ReroutedTraffic{ id, diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 5681612271b828550a826118b066917dd7b1bc02..46dbf2abfc1db253f4f6414d548bba4bac9dae36 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -1,130 +1,57 @@ use std::sync::Arc; -use super::runtime::*; -use crate::ProtocolDescription; +use super::*; +use crate::{PortId, ProtocolDescription}; +use crate::common::Id; use crate::protocol::eval::*; -#[test] -fn test_single_message() { - // Simple test were we have a `putter` component, which will simply send a - // single message (a boolean), and a `getter` component, which will receive - // that message. - // We will write this behaviour in the various ways that the language - // currently allows. We will cheat a bit by peeking into the runtime to make - // sure that the getter actually received the message. - // TODO: Expose ports to a "native application" - - fn check_store_bool(value: &Value, expected: bool) { - if let Value::Bool(value) = value { - assert_eq!(*value, expected); - } else { - assert!(false); - } - } - - fn run_putter_getter(code: &[u8]) { - // Compile code - let pd = ProtocolDescription::parse(code) - .expect("code successfully compiles"); - let pd = Arc::new(pd); - - // Construct runtime and the appropriate ports and connectors - let mut rt = Runtime::new(pd); - let (put_port, get_port) = rt.add_channel(); - - let mut put_args = ValueGroup::new_stack(vec![ - put_port, - ]); - rt.add_component("", "putter", put_args) - .expect("'putter' component created"); - - let mut get_args = ValueGroup::new_stack(vec![ - get_port, - ]); - rt.add_component("", "getter", get_args) - .expect("'getter' component created"); - - // Run until completion - rt.run(); +fn runtime_for(num_threads: u32, pdl: &str) -> Runtime { + let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl"); + let runtime = Runtime::new(num_threads, protocol); - // Check for success (the 'received' and 'did_receive" flags) - let getter_component = rt.connectors.get(&1).unwrap(); - let branch = &getter_component.branches[0]; - assert_eq!(branch.branch_state, BranchState::Finished); + return runtime; +} - // Note: with the stack structure of the store, the first entry is the - // "previous stack pos" and the second one is the input port passed to - // the procedure. Hence the third/fourth entries are the boolean - // variables on the stack. - check_store_bool(&branch.code_state.prompt.store.stack[2], true); - check_store_bool(&branch.code_state.prompt.store.stack[3], true); +#[test] +fn test_put_and_get() { + let rt = runtime_for(4, " +primitive putter(out sender, u32 loops) { + u32 index = 0; + while (index < loops) { + synchronous { + print(\"putting!\"); + put(sender, true); + } + index += 1; } - - // Without `fires()`, just a single valid behaviour - run_putter_getter( - b"primitive putter(out put_here) { - synchronous { - put(put_here, true); - } +} + +primitive getter(in receiver, u32 loops) { + u32 index = 0; + while (index < loops) { + synchronous { + print(\"getting!\"); + auto result = get(receiver); + assert(result); } + index += 1; + } +} + "); - primitive getter(in get_here) { - bool received = false; - bool did_receive = false; - - synchronous { - received = get(get_here); - if (received) { - print(\"value was 'true'\"); - } else { - print(\"value was 'false'\"); - } - did_receive = true; - } - }"); - - // With `fires()`, but eliminating on the putter side - run_putter_getter( - b"primitive putter(out put_here) { - synchronous { - if (!fires(put_here)) { - assert(false); - } else { - put(put_here, true); - } - } - } + let mut api = rt.create_interface(); + let channel = api.create_channel(); + let num_loops = 5; - primitive getter(in get_here) { - bool received = false; bool did_receive = false; - synchronous { - if (fires(get_here)) { - received = get(get_here); - did_receive = true; - } - } - }"); + api.create_connector("", "putter", ValueGroup::new_stack(vec![ + Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })), + Value::UInt32(num_loops) + ])).expect("create putter"); - // With `fires()`, but eliminating on the getter side - run_putter_getter( - b"primitive putter(out put_here) { - synchronous { - if (fires(put_here)) { - put(put_here, true); - } - } - } + api.create_connector("", "getter", ValueGroup::new_stack(vec![ + Value::Input(PortId(Id{ connector_id: 0, u32_suffix: channel.getter_id.index })), + Value::UInt32(num_loops) + ])).expect("create getter"); - primitive getter(in get_here) { - bool received = false; bool did_receive = false; - synchronous { - if (fires(get_here)) { - received = get(get_here); - did_receive = true; - } else { - assert(false); - } - } - }" - ); + println!("Am I running?"); } \ No newline at end of file