diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index af9724fc4040d2aae15a500acdfcc49b4f73c19b..62481340e1b64cfaf42180b4dc1041409f4d733b 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -174,7 +174,7 @@ impl ProtocolDescription { let module_root = module_root.unwrap(); let root = &self.heap[module_root]; - let definition_id = root.get_definition_ident(&heap, identifier); + let definition_id = root.get_definition_ident(&self.heap, identifier); if definition_id.is_none() { return Err(ComponentCreationError::DefinitionDoesntExist); } @@ -210,7 +210,7 @@ impl ProtocolDescription { // By now we're sure that all of the arguments are correct. So create // the connector. return Ok(ComponentState{ - prompt: Prompt::new(&self.types, &self.heap, def, 0, arguments), + prompt: Prompt::new(&self.types, &self.heap, definition_id, 0, arguments), }); } @@ -232,24 +232,18 @@ impl ProtocolDescription { fn verify_same_type(&self, expected: &ConcreteType, expected_idx: usize, arguments: &ValueGroup, argument: &Value) -> bool { use ConcreteTypePart as CTP; - macro_rules! match_variant { - ($value:expr, $variant:expr) => { - if let $variant(_) = $value { true } else { false } - }; - } - match &expected.parts[expected_idx] { CTP::Void | CTP::Message | CTP::Slice | CTP::Function(_, _) | CTP::Component(_, _) => unreachable!(), - CTP::Bool => match_variant!(argument, Value::Bool), - CTP::UInt8 => match_variant!(argument, Value::UInt8), - CTP::UInt16 => match_variant!(argument, Value::UInt16), - CTP::UInt32 => match_variant!(argument, Value::UInt32), - CTP::UInt64 => match_variant!(argument, Value::UInt64), - CTP::SInt8 => match_variant!(argument, Value::SInt8), - CTP::SInt16 => match_variant!(argument, Value::SInt16), - CTP::SInt32 => match_variant!(argument, Value::SInt32), - CTP::SInt64 => match_variant!(argument, Value::SInt64), - CTP::Character => match_variant!(argument, Value::Char), + CTP::Bool => if let Value::Bool(_) = argument { true } else { false }, + CTP::UInt8 => if let Value::UInt8(_) = argument { true } else { false }, + CTP::UInt16 => if let Value::UInt16(_) = argument { true } else { false }, + CTP::UInt32 => if let Value::UInt32(_) = argument { true } else { false }, + CTP::UInt64 => if let Value::UInt64(_) = argument { true } else { false }, + CTP::SInt8 => if let Value::SInt8(_) = argument { true } else { false }, + CTP::SInt16 => if let Value::SInt16(_) = argument { true } else { false }, + CTP::SInt32 => if let Value::SInt32(_) = argument { true } else { false }, + CTP::SInt64 => if let Value::SInt64(_) = argument { true } else { false }, + CTP::Character => if let Value::Char(_) = argument { true } else { false }, CTP::String => { // Match outer string type and embedded character types if let Value::String(heap_pos) = argument { @@ -277,8 +271,8 @@ impl ProtocolDescription { return false; } }, - CTP::Input => match_variant!(argument, Value::Input), - CTP::Output => match_variant!(argument, Value::Output), + CTP::Input => if let Value::Input(_) = argument { true } else { false }, + CTP::Output => if let Value::Output(_) = argument { true } else { false }, CTP::Instance(_definition_id, _num_embedded) => { todo!("implement full type checking on user-supplied arguments"); return false; diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 22e82638e9e0b1d5468aae9428b89ec9c63248ec..42546271a1f6f01dfc042f0e78615f0446a12171 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,18 +1,16 @@ use std::collections::HashMap; -use std::ops::Deref; use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::global_store::ConnectorKey; -use crate::runtime2::inbox::{MessageContents, OutgoingMessage, SolutionMessage}; +use crate::runtime2::inbox::{MessageContents, SolutionMessage}; use crate::runtime2::native::Connector; -use crate::runtime2::port::PortKind; +use crate::runtime2::port::{Port, PortKind}; use crate::runtime2::scheduler::ConnectorCtx; use super::global_store::ConnectorId; use super::inbox::{ - PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage, + PrivateInbox, PublicInbox, DataMessage, SyncMessage, SyncBranchConstraint, SyncConnectorSolution }; use super::port::PortIdLocal; @@ -41,7 +39,7 @@ impl BranchId { } } -#[derive(PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq)] pub(crate) enum SpeculativeState { // Non-synchronous variants RunningNonSync, // regular execution of code @@ -141,7 +139,7 @@ impl PortAssignment { } } -#[derive(Clone, Eq)] +#[derive(Clone)] struct PortOwnershipDelta { acquired: bool, // if false, then released ownership port_id: PortIdLocal, @@ -227,7 +225,7 @@ impl ConnectorPorts { #[inline] fn get_port_index(&self, port_id: PortIdLocal) -> Option { for (idx, port) in self.owned_ports.iter().enumerate() { - if port == port_id { + if *port == port_id { return Some(idx) } } @@ -250,7 +248,7 @@ impl ConnectorPorts { #[inline] fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment { let mapped_idx = self.mapped_index(branch_idx, port_idx); - return &mut self.port_mapping(mapped_idx); + return &mut self.port_mapping[mapped_idx]; } #[inline] @@ -323,7 +321,7 @@ pub(crate) struct ConnectorPDL { sync_active: BranchQueue, sync_pending_get: BranchQueue, sync_finished: BranchQueue, - sync_finished_last_handled: u32, + sync_finished_last_handled: u32, // TODO: Change to BranchId? cur_round: u32, // Port/message management pub committed_to: Option<(ConnectorId, u64)>, @@ -365,7 +363,7 @@ impl Connector for ConnectorPDL { fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { - let scheduling = self.run_in_speculative_mode(pd, ctx, results); + let scheduling = self.run_in_speculative_mode(pd, ctx, delta_state); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -389,13 +387,13 @@ impl Connector for ConnectorPDL { // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx); if let Some(valid_solution) = solution_message { - self.submit_sync_solution(valid_solution, ctx, results); + self.submit_sync_solution(valid_solution, ctx, delta_state); } else { // Branch is actually invalid, but we only just figured // it out. We need to mark it as invalid to prevent // future use Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); - if branch_id == self.sync_finished_last_handled { + if branch_id.index == self.sync_finished_last_handled { self.sync_finished_last_handled = self.sync_finished.last; } @@ -414,7 +412,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(pd, ctx, results); + let scheduling = self.run_in_deterministic_mode(pd, ctx, delta_state); return scheduling; } } @@ -456,7 +454,7 @@ impl ConnectorPDL { /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate. pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed - debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints + debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints // TODO: Optimize, use some kind of temp workspace vector let mut execution_path_branch_ids = Vec::new(); @@ -513,7 +511,7 @@ impl ConnectorPDL { let port_index = port_index.unwrap(); let mapping = self.ports.get_port(branch_index, port_index); - if mapping.last_registered_branch_id != expected_branch_id { + if mapping.last_registered_branch_id != *expected_branch_id { // Not the expected interaction on this port, constraint not satisfied continue 'branch_loop; } @@ -571,9 +569,9 @@ impl ConnectorPDL { }; match new_solution.add_or_check_constraint(peer_connector_id, constraint) { - None => continue 'branch_loop, - Some(false) => continue 'branch_loop, - Some(true) => {}, + Err(_) => continue 'branch_loop, + Ok(false) => continue 'branch_loop, + Ok(true) => {}, } } @@ -599,7 +597,7 @@ impl ConnectorPDL { // Already committed to something. So will commit to this if it // takes precedence over the current solution message.comparison_number > *previous_comparison || - (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_comparison.0) + (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_origin.0) }, None => { // Not yet committed to a solution, so commit to this one @@ -844,7 +842,7 @@ impl ConnectorPDL { // Put in run results for thread to pick up and transfer to // the correct connector inbox. port_mapping.mark_definitive(branch.index, 1); - let message = OutgoingDataMessage { + let message = DataMessage{ sending_port: local_port_id, sender_prev_branch_id: BranchId::new_invalid(), sender_cur_branch_id: branch.index, @@ -858,7 +856,7 @@ impl ConnectorPDL { Self::release_ports_during_sync(&mut self.ports, branch, &results.ports); results.ports.clear(); - results.outbox.push(OutgoingMessage::Data(message)); + results.outbox.push(MessageContents::Data(message)); return ConnectorScheduling::Immediate } else { branch.sync_state = SpeculativeState::Inconsistent; @@ -948,16 +946,16 @@ impl ConnectorPDL { // linked lists inside of the vector of branches. /// Pops from front of linked-list branch queue. - fn pop_branch_from_queue(branches: &mut Vec, queue: &mut BranchQueue) -> &mut Branch { + fn pop_branch_from_queue<'a>(branches: &'a mut Vec, queue: &mut BranchQueue) -> &'a mut Branch { debug_assert!(queue.first != 0); let branch = &mut branches[queue.first as usize]; - *queue.first = branch.next_branch_in_queue.unwrap_or(0); + queue.first = branch.next_branch_in_queue.unwrap_or(0); branch.next_branch_in_queue = None; - if *queue.first == 0 { + if queue.first == 0 { // No more entries in queue - debug_assert_eq!(*queue.last, branch.index.index); - *queue.last = 0; + debug_assert_eq!(queue.last, branch.index.index); + queue.last = 0; } return branch; @@ -970,17 +968,17 @@ impl ConnectorPDL { debug_assert!(to_push.is_valid()); let to_push = to_push.index; - if *queue.last == 0 { + if queue.last == 0 { // No branches in the queue at all - debug_assert_eq!(*queue.first, 0); + debug_assert_eq!(queue.first, 0); branches[to_push as usize].next_branch_in_queue = None; - *queue.first = to_push; - *queue.last = to_push; + queue.first = to_push; + queue.last = to_push; } else { // Pre-existing branch in the queue - debug_assert_ne!(*queue.first, 0); - branches[*queue.last as usize].next_branch_in_queue = Some(to_push); - *queue.last = to_push; + debug_assert_ne!(queue.first, 0); + branches[queue.last as usize].next_branch_in_queue = Some(to_push); + queue.last = to_push; } } @@ -1050,6 +1048,7 @@ impl ConnectorPDL { /// Releasing ownership of ports during a sync-session. Will provide an /// error if the port was already used during a sync block. fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode for port_id in port_ids { @@ -1064,7 +1063,7 @@ impl ConnectorPDL { } for delta in &branch.ports_delta { - if delta.port_id == port_id { + if delta.port_id == *port_id { // We cannot have acquired this port, because the // call to `ports.get_port_index` returned an index. debug_assert!(!delta.acquired); @@ -1100,6 +1099,7 @@ impl ConnectorPDL { /// Acquiring ports during a sync-session. fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { + todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode 'port_loop: for port_id in port_ids { @@ -1175,7 +1175,7 @@ impl ConnectorPDL { // sender and one for the receiver, ensuring it was not used. // TODO: This will fail if a port is passed around multiple times. // maybe a special "passed along" entry in `ports_delta`. - if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) { + if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)).unwrap() { return None; } @@ -1202,7 +1202,7 @@ impl ConnectorPDL { SyncBranchConstraint::SilentPort(port.peer_id) }; - if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() { + if !sync_message.add_or_check_constraint(port.peer_connector, constraint).unwrap() { return None; } } @@ -1268,6 +1268,7 @@ pub(crate) struct RunDeltaState { // state changes and try to apply them. pub outbox: Vec, pub new_connectors: Vec, + pub new_ports: Vec, // Workspaces pub ports: Vec, } @@ -1279,11 +1280,13 @@ impl RunDeltaState { RunDeltaState{ outbox: Vec::with_capacity(64), new_connectors: Vec::new(), + new_ports: Vec::new(), ports: Vec::with_capacity(64), } } } +#[derive(Eq, PartialEq)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time @@ -1300,7 +1303,7 @@ pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Ve // This is an actual port let cur_port = PortIdLocal::new(port_id.0.u32_suffix); for prev_port in ports.iter() { - if prev_port == cur_port { + if *prev_port == cur_port { // Already added return; } diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index 3b62801458e6cdc055fd273866a2f921da1e3ed0..4e7b9f621ed03fe587d80c4186d72249417ca598 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -1,18 +1,16 @@ use std::ptr; -use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard}; +use std::sync::{Arc, RwLock}; use std::sync::atomic::{AtomicBool, AtomicU32}; use crate::collections::{MpmcQueue, RawVec}; use super::connector::{ConnectorPDL, ConnectorPublic}; -use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; -use super::inbox::PublicInbox; use super::scheduler::Router; use crate::ProtocolDescription; use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState}; -use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage}; -use crate::runtime2::native::Connector; +use crate::runtime2::inbox::MessageContents; +use crate::runtime2::native::{Connector, ConnectorApplication}; use crate::runtime2::scheduler::ConnectorCtx; /// A kind of token that, once obtained, allows mutable access to a connector. @@ -40,7 +38,7 @@ impl ConnectorKey { /// A kind of token that allows shared access to a connector. Multiple threads /// may hold this -#[derive(Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub(crate) struct ConnectorId(pub u32); impl ConnectorId { @@ -121,7 +119,7 @@ impl ConnectorStore { unsafe { let connector = lock.connectors.get(connector_id.0 as usize); debug_assert!(!connector.is_null()); - return &*connector.public; + return &(**connector).public; } } @@ -133,16 +131,56 @@ impl ConnectorStore { unsafe { let connector = lock.connectors.get_mut(key.index as usize); debug_assert!(!connector.is_null()); - return *connector as &mut _; + return &mut (**connector); } } + pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey { + // Connector interface does not own any initial ports, and cannot be + // created by another connector + let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector))); + return key; + } + /// Create a new connector, returning the key that can be used to retrieve /// and/or queue it. The caller must make sure that the constructed /// connector's code is initialized with the same ports as the ports in the /// `initial_ports` array. - pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant, initial_ports: Vec) -> ConnectorKey { + pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey { + let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector)); + let new_connector = self.get_mut(&key); + + // Transferring ownership of ports (and crashing if there is a + // programmer's mistake in port management) + match &new_connector.connector { + ConnectorVariant::UserDefined(connector) => { + for port_id in &connector.ports.owned_ports { + let mut port = created_by.context.remove_port(*port_id); + new_connector.context.add_port(port); + } + }, + ConnectorVariant::Native(_) => unreachable!(), + } + + return key; + } + + pub(crate) fn destroy(&self, key: ConnectorKey) { + let lock = self.inner.write().unwrap(); + + unsafe { + let connector = lock.connectors.get_mut(key.index as usize); + ptr::drop_in_place(*connector); + // Note: but not deallocating! + } + + lock.free.push(key.index as usize); + } + + /// Creates a connector but does not set its initial ports + fn create_connector_raw(&self, connector: ConnectorVariant) -> ConnectorKey { // Creation of the connector in the global store, requires a lock + let index; { let lock = self.inner.write().unwrap(); let connector = ScheduledConnector { @@ -152,7 +190,6 @@ impl ConnectorStore { router: Router::new(), }; - let index; if lock.free.is_empty() { let connector = Box::into_raw(Box::new(connector)); @@ -172,37 +209,14 @@ impl ConnectorStore { } } - // Setting of new connector's ID + // Generate key and retrieve the connector to set its ID let key = ConnectorKey{ index: index as u32 }; let new_connector = self.get_mut(&key); new_connector.context.id = key.downcast(); - // Transferring ownership of ports (and crashing if there is a - // programmer's mistake in port management) - match &new_connector.connector { - ConnectorVariant::UserDefined(connector) => { - for port_id in &connector.ports.owned_ports { - let mut port = created_by.context.remove_port(*port_id); - new_connector.context.add_port(port); - } - }, - ConnectorVariant::Native(_) => {}, // no initial ports (yet!) - } - + // Return the connector key return key; } - - pub(crate) fn destroy(&self, key: ConnectorKey) { - let lock = self.inner.write().unwrap(); - - unsafe { - let connector = lock.connectors.get_mut(key.index as usize); - ptr::drop_in_place(*connector); - // Note: but not deallocating! - } - - lock.free.push(key.index as usize); - } } impl Drop for ConnectorStore { diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index fbc1130d314a9db50de959332eee57d02652bef8..b07d021190240a8c06e000f0bc4398e19bddb273 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -13,11 +13,11 @@ within a certain sync-round. **/ use std::collections::VecDeque; -use std::sync::{RwLock, RwLockReadGuard, Mutex}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Mutex; use crate::protocol::eval::ValueGroup; -use super::connector::{BranchId, PortIdLocal}; +use super::connector::BranchId; +use super::port::PortIdLocal; use super::global_store::ConnectorId; /// A message that has been delivered (after being imbued with the receiving @@ -143,7 +143,7 @@ impl SyncMessage { match constraint { SyncBranchConstraint::SilentPort(silent_port_id) => { for (port_id, mapped_id) in &entry.final_port_mapping { - if port_id == silent_port_id { + if *port_id == silent_port_id { // If silent, then mapped ID is invalid return Ok(!mapped_id.is_valid()) } @@ -167,6 +167,7 @@ impl SyncMessage { } } +#[derive(Clone)] pub struct SolutionMessage { pub comparison_number: u64, pub connector_origin: ConnectorId, @@ -176,11 +177,13 @@ pub struct SolutionMessage { /// A control message. These might be sent by the scheduler to notify eachother /// of asynchronous state changes. +#[derive(Clone)] pub struct ControlMessage { pub id: u32, // generic identifier, used to match request to response pub content: ControlMessageVariant, } +#[derive(Clone)] pub enum ControlMessageVariant { ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port Ack, // acknowledgement of previous control message, matching occurs through control message ID. @@ -258,7 +261,7 @@ impl PrivateInbox { for existing in self.messages.iter() { if existing.sender_prev_branch_id == message.sender_prev_branch_id && existing.sender_cur_branch_id == message.sender_cur_branch_id && - existing.receiving_port == message.receiving_port { + existing.sending_port == message.sending_port { // Message was already received return; } @@ -313,10 +316,10 @@ pub struct InboxMessageIter<'i> { match_prev_branch_id: BranchId, } -impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> { - type Item = &'m DataMessage; +impl<'i> Iterator for InboxMessageIter<'i> { + type Item = &'i DataMessage; - fn next(&'m mut self) -> Option { + fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let cur_message = &self.messages[self.next_index]; diff --git a/src/runtime2/messages.rs b/src/runtime2/messages.rs index fde4cd812a4004c39cc59875c754e1ceb15a3a6c..8949314f4d03424c8098df0893a138866003ceb9 100644 --- a/src/runtime2/messages.rs +++ b/src/runtime2/messages.rs @@ -1,14 +1,10 @@ -use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::HashMap; -use crate::common::Id; use crate::PortId; use crate::protocol::*; use crate::protocol::eval::*; -use super::connector::{BranchId, PortIdLocal}; - /// A message residing in a connector's inbox (waiting to be put into some kind /// of speculative branch), or a message waiting to be sent. #[derive(Clone)] diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 8ac3156cbb45147d161cbe4457f58ea181a64fe1..e199ab5e454d13ea8067111623dfb14e30526cf1 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -17,13 +17,10 @@ use std::sync::{Arc, Mutex}; use std::sync::atomic::Ordering; use std::thread::{self, JoinHandle}; -use crate::protocol::eval::*; -use crate::{common::Id, PortId, ProtocolDescription}; +use crate::ProtocolDescription; use global_store::{ConnectorVariant, GlobalStore}; use scheduler::Scheduler; -use crate::protocol::ComponentCreationError; -use connector::{Branch, ConnectorPDL, find_ports_in_value_group}; use native::{ConnectorApplication, ApplicationInterface}; @@ -42,6 +39,10 @@ pub(crate) struct RuntimeInner { schedulers: Mutex>>, // TODO: Revise, make exit condition something like: all interfaces dropped } +// TODO: Come back to this at some point +unsafe impl Send for RuntimeInner {} +unsafe impl Sync for RuntimeInner {} + impl Runtime { pub fn new(num_threads: usize, protocol_description: ProtocolDescription) -> Runtime { // Setup global state @@ -56,8 +57,9 @@ impl Runtime { { let mut schedulers = Vec::with_capacity(num_threads); for _ in 0..num_threads { - let mut scheduler = Scheduler::new(runtime_inner.clone()); + let cloned_runtime_inner = runtime_inner.clone(); let thread = thread::spawn(move || { + let mut scheduler = Scheduler::new(cloned_runtime_inner); scheduler.run(); }); @@ -76,9 +78,7 @@ impl Runtime { /// created. pub fn create_interface(&self) -> ApplicationInterface { let (connector, mut interface) = ConnectorApplication::new(self.inner.clone()); - let connector = Box::new(connector); - - let connector_key = self.global_store.connectors.create(ConnectorVariant::Native(connector)); + let connector_key = self.inner.global_store.connectors.create_interface(connector); interface.set_connector_id(connector_key.downcast()); // Note that we're not scheduling. That is done by the interface in case diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 04285976db5961cf363f5ff5ca65a4907151da43..a436d529b9cbca6154208bfe01de9df3dcbd857b 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -1,21 +1,20 @@ use std::sync::{Arc, Mutex, Condvar}; -use std::cell::Cell; use std::sync::atomic::Ordering; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use crate::runtime2::connector::{Branch, find_ports_in_value_group}; -use crate::runtime2::global_store::{ConnectorKey, GlobalStore}; +use crate::runtime2::global_store::ConnectorKey; use crate::runtime2::inbox::MessageContents; use crate::runtime2::port::{Port, PortKind}; use crate::runtime2::scheduler::ConnectorCtx; use super::RuntimeInner; -use super::global_store::{ConnectorVariant, ConnectorId}; +use super::global_store::ConnectorId; use super::port::{Channel, PortIdLocal}; use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; -use super::inbox::{Message, DataMessage, SyncMessage}; +use super::inbox::Message; /// Generic connector interface from the scheduler's point of view. pub trait Connector { @@ -32,6 +31,7 @@ type SyncDone = Arc<(Mutex, Condvar)>; type JobQueue = Arc>>; enum ApplicationJob { + NewChannel((Port, Port)), NewConnector(ConnectorPDL), } @@ -63,6 +63,11 @@ impl Connector for ConnectorApplication { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop() { match job { + ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { + delta_state.new_ports.reserve(2); + delta_state.new_ports.push(endpoint_a); + delta_state.new_ports.push(endpoint_b); + } ApplicationJob::NewConnector(connector) => { delta_state.new_connectors.push(connector); } @@ -80,11 +85,11 @@ pub struct ApplicationInterface { job_queue: JobQueue, runtime: Arc, connector_id: ConnectorId, - owned_ports: Vec, + owned_ports: Vec, } impl ApplicationInterface { - pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { + pub(crate) fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { return Self{ sync_done, job_queue, runtime, connector_id: ConnectorId::new_invalid(), @@ -99,19 +104,31 @@ impl ApplicationInterface { let putter_id = PortIdLocal::new(getter_id + 1); let getter_id = PortIdLocal::new(getter_id); - self.owned_ports.push(Port{ + // Create ports and add a job such that they are transferred to the + // API component. (note that we do not send a ping, this is only + // necessary once we create a connector) + let getter_port = Port{ self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, peer_connector: self.connector_id, - }); - - self.owned_ports.push(Port{ + }; + let putter_port = Port{ self_id: putter_id, peer_id: getter_id, kind: PortKind::Putter, peer_connector: self.connector_id, - }); + }; + + { + let mut lock = self.job_queue.lock().unwrap(); + lock.push(ApplicationJob::NewChannel((getter_port, putter_port))); + } + + // Add to owned ports for error checking while creating a connector + self.owned_ports.reserve(2); + self.owned_ports.push(putter_id); + self.owned_ports.push(getter_id); return Channel{ putter_id, getter_id }; } @@ -130,7 +147,7 @@ impl ApplicationInterface { match self.owned_ports.iter().position(|v| v == port_to_remove) { Some(index_to_remove) => { // We own the port, so continue - self.owned_ports.remove(index_to_remove) + self.owned_ports.remove(index_to_remove); }, None => { // We don't own the port @@ -150,7 +167,12 @@ impl ApplicationInterface { // Send ping message to wake up connector let connector = self.runtime.global_store.connectors.get_shared(self.connector_id); - connector.inbox.insert_message(Message::Ping); + connector.inbox.insert_message(Message{ + sending_connector: ConnectorId::new_invalid(), + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Ping, + }); + let should_wake_up = connector.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index 28401fd338b82e433674f41f09ff437f93095e93..88d22e8574a1b103bd2b83c95bbf43a337d5bad0 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -1,6 +1,6 @@ use super::global_store::ConnectorId; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub(crate) struct PortIdLocal { pub index: u32, } @@ -21,6 +21,7 @@ impl PortIdLocal { } } +#[derive(Eq, PartialEq)] pub enum PortKind { Putter, Getter, diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 233e0c0c7c0b02205d8b021eff89ef8010d7d50c..8f2ac8ae030d9f193e325dd32274a788ef182fe0 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,20 +1,18 @@ use std::sync::Arc; -use std::sync::Condvar; use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use std::thread; -use crate::ProtocolDescription; use crate::runtime2::global_store::ConnectorVariant; use crate::runtime2::inbox::MessageContents; use crate::runtime2::native::Connector; -use crate::runtime2::port::{Channel, PortKind, PortOwnership}; +use crate::runtime2::port::{Channel, PortKind}; use super::RuntimeInner; use super::port::{Port, PortIdLocal}; -use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant}; -use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; -use super::global_store::{ConnectorKey, ConnectorId, GlobalStore}; +use super::inbox::{Message, ControlMessage, ControlMessageVariant}; +use super::connector::{ConnectorScheduling, RunDeltaState}; +use super::global_store::{ConnectorKey, ConnectorId}; /// Contains fields that are mostly managed by the scheduler, but may be /// accessed by the connector @@ -29,7 +27,7 @@ impl ConnectorCtx { Self{ id: ConnectorId::new_invalid(), port_counter, - ports: initial_ports, + ports: Vec::new(), } } @@ -131,53 +129,46 @@ impl Scheduler { while cur_schedule == ConnectorScheduling::Immediate { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { - match message.contents { - MessageContents::Data(content) => { - // Check if we need to reroute, or can just put it - // in the private inbox of the connector - if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, content.sending_port) { - self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(content)); - } else { - scheduled.connector.insert_data_message(content); - } - } - MessageContents::Sync(content) => { - scheduled.connector.insert_sync_message(content, &scheduled.context, &mut delta_state); - } - MessageContents::Solution(content) => { - // TODO: Handle solution message - }, - MessageContents::Control(content) => { - match content.content { - ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { - // Need to change port target - let port = scheduled.context.get_port_mut(port_id); - port.peer_connector = new_target_connector_id; - debug_assert!(delta_state.outbox.is_empty()); - - // And respond with an Ack - // Note: after this code has been reached, we may not have any - // messages in the outbox that send to the port whose owning - // connector we just changed. This is because the `ack` will - // clear the rerouting entry of the `ack`-receiver. - self.send_message_and_wake_up_if_sleeping( - content.sender, - Message{ - sending_connector: connector_key.downcast(), - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - } - ); - }, - ControlMessageVariant::Ack => { - scheduled.router.handle_ack(content.id); - } + // Check for rerouting + if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { + self.send_message_and_wake_up_if_sleeping(other_connector_id, message); + continue; + } + + // Check for messages that requires special action from the + // scheduler. + if let MessageContents::Control(content) = message.contents { + match content.content { + ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { + // Need to change port target + let port = scheduled.context.get_port_mut(port_id); + port.peer_connector = new_target_connector_id; + debug_assert!(delta_state.outbox.is_empty()); + + // And respond with an Ack + // Note: after this code has been reached, we may not have any + // messages in the outbox that send to the port whose owning + // connector we just changed. This is because the `ack` will + // clear the rerouting entry of the `ack`-receiver. + self.send_message_and_wake_up_if_sleeping( + message.sending_connector, + Message{ + sending_connector: connector_key.downcast(), + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), + } + ); + }, + ControlMessageVariant::Ack => { + scheduled.router.handle_ack(content.id); } } - Message::Ping => {}, + } else { + // Let connector handle message + scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state); } } @@ -252,7 +243,7 @@ impl Scheduler { let message = Message{ sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), - contents: contents.clone(), + contents: MessageContents::ConfirmCommit(contents.clone()), }; self.send_message_and_wake_up_if_sleeping(*to_visit, message); } @@ -277,6 +268,12 @@ impl Scheduler { } } + if !delta_state.new_ports.is_empty() { + for port in delta_state.new_ports.drain(..) { + context.ports.push(port); + } + } + // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { @@ -284,7 +281,7 @@ impl Scheduler { for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key - let new_key = self.runtime.global_store.connectors.create(cur_connector, ConnectorVariant::UserDefined(new_connector)); + let new_key = self.runtime.global_store.connectors.create_pdl(cur_connector, new_connector); let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); // Call above changed ownership of ports, but we still have to @@ -296,7 +293,7 @@ impl Scheduler { port.peer_connector, new_connector.context.id ); - self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message); + self.send_message_and_wake_up_if_sleeping(port.peer_connector, reroute_message); } // Schedule new connector to run @@ -305,7 +302,7 @@ impl Scheduler { } } - pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { + fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { let connector = self.runtime.global_store.connectors.get_shared(connector_id); connector.inbox.insert_message(message); @@ -324,7 +321,7 @@ impl Scheduler { // TODO: Optimize struct ReroutedTraffic { id: u32, // ID of control message - port: PortIdLocal, // targeted port + target_port: PortIdLocal, // targeted port source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector they should be rerouted to } @@ -356,23 +353,27 @@ impl Router { self.active.push(ReroutedTraffic{ id, - port: port_id, + target_port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, }); - return Message::Control(ControlMessage{ - id, - content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id) - }); + return Message{ + sending_connector: self_connector_id, + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id, + content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id), + }) + }; } /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. - pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option { + pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { for reroute in &self.active { if reroute.source_connector == sending_connector && - reroute.port == sending_port { + reroute.target_port == target_port { // Need to reroute this message return Some(reroute.target_connector); }