From cf26538b25dccd4e637c8bb56ec2037faf7be8d5 2021-10-06 18:16:42 From: MH Date: 2021-10-06 18:16:42 Subject: [PATCH] architecture for send/recv ports in place --- diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 518df447c90e662382a0aaa57b9afc856d5eada9..3f75f4875bc9236146b9d7e95b5b23d8769bf670 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -3,24 +3,8 @@ use std::collections::HashMap; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::inbox::{Inbox, Message}; - -#[derive(Clone, Copy, PartialEq, Eq)] -pub(crate) struct PortIdLocal { - pub id: u32, -} - -impl PortIdLocal { - pub fn new(id: u32) -> Self { - Self{ id } - } - - // TODO: Unsure about this, maybe remove, then also remove all struct - // instances where I call this - pub fn new_invalid() -> Self { - Self{ id: u32::MAX } - } -} +use crate::runtime2::inbox::{Inbox, OutboxMessage}; +use crate::runtime2::port::PortIdLocal; /// Represents the identifier of a branch (the index within its container). An /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not @@ -67,7 +51,7 @@ pub(crate) struct Branch { sync_state: SpeculativeState, next_branch_in_queue: Option, // Message/port state - inbox: HashMap, // TODO: @temporary, remove together with fires() + inbox: HashMap, // TODO: @temporary, remove together with fires() ports_delta: Vec, } @@ -261,6 +245,23 @@ impl BranchQueue { } } +/// Public fields of the connector that can be freely shared between multiple +/// threads. Note that this is not enforced by the compiler. The global store +/// allows retrieving the entire `Connector` as a mutable reference by one +/// thread, and this `ConnectorPublic` by any number of threads. +pub(crate) struct ConnectorPublic { + pub inbox: Inbox, +} + +impl ConnectorPublic { + pub fn new() -> Self { + ConnectorPublic{ + inbox: Inbox::new(), + } + } +} + +// TODO: Maybe prevent false sharing by aligning `public` to next cache line. pub(crate) struct Connector { // State and properties of connector itself id: u32, @@ -271,8 +272,8 @@ pub(crate) struct Connector { sync_pending_get: BranchQueue, sync_finished: BranchQueue, // Port/message management - ports: ConnectorPorts, - inbox: Inbox, + pub ports: ConnectorPorts, + pub public: ConnectorPublic, } struct TempCtx {} @@ -307,7 +308,7 @@ impl Connector { sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), ports: ConnectorPorts::new(owned_ports), - inbox: Inbox::new(), + public: ConnectorPublic::new(), } } @@ -441,7 +442,7 @@ impl Connector { }, RunResult::BranchPut(port_id, value_group) => { // Branch performed a `put` on a particualar port. - let local_port_id = PortIdLocal{ id: port_id.0.u32_suffix }; + let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix }; let local_port_index = self.ports.get_port_index(local_port_id); if local_port_index.is_none() { todo!("handle case where port was received before (i.e. in ports_delta)") @@ -470,9 +471,8 @@ impl Connector { // Put in run results for thread to pick up and transfer to // the correct connector inbox. port_mapping.mark_definitive(branch.index, 1); - let message = Message{ + let message = OutboxMessage { sending_port: local_port_id, - receiving_port: PortIdLocal::new_invalid(), sender_prev_branch_id: BranchId::new_invalid(), sender_cur_branch_id: branch.index, message: value_group, @@ -709,7 +709,7 @@ impl Connector { pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. - pub outbox: Vec, + pub outbox: Vec, pub new_connectors: Vec, // Workspaces pub ports: Vec, diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index ca7b4ea548b598fca2e83d3555780fed381fbefb..9b75ae2a4536113990a55afca44b831331b5e84f 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -1,51 +1,82 @@ use crate::collections::{MpmcQueue, RawVec}; -use super::connector::Connector; +use super::connector::{Connector, ConnectorPublic}; +use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; use std::ptr; -use std::sync::RwLock; +use std::sync::{RwLock, RwLockReadGuard}; /// A kind of token that, once obtained, allows access to a container. struct ConnectorKey { index: u32, // of connector } +/// The registry containing all connectors. The idea here is that when someone +/// owns a `ConnectorKey`, then one has unique access to that connector. +/// Otherwise one has shared access. +/// +/// This datastructure is built to be wrapped in a RwLock. struct ConnectorStore { + inner: RwLock, +} + +struct ConnectorStoreInner { connectors: RawVec<*mut Connector>, free: Vec, } impl ConnectorStore { fn with_capacity(capacity: usize) -> Self { - Self{ - connectors: RawVec::with_capacity(capacity), - free: Vec::with_capacity(capacity), + return Self{ + inner: RwLock::new(ConnectorStoreInner { + connectors: RawVec::with_capacity(capacity), + free: Vec::with_capacity(capacity), + }), + }; + } + + /// Retrieves the shared members of the connector. + pub(crate) fn get_shared(&self, connector_id: u32) -> &'static ConnectorPublic { + let lock = self.inner.read().unwrap(); + + unsafe { + let connector = lock.connectors.get(connector_id as usize); + debug_assert!(!connector.is_null()); + return &*connector.public; } } - fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector { + /// Retrieves a particular connector. Only the thread that pulled the + /// associated key out of the execution queue should (be able to) call this. + pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut Connector { + let lock = self.inner.read().unwrap(); + unsafe { - let connector = self.connectors.get_mut(key.index as usize); + let connector = lock.connectors.get_mut(key.index as usize); debug_assert!(!connector.is_null()); return *connector as &mut _; } } - fn create(&mut self, connector: Connector) -> ConnectorKey { + /// Create a new connector, returning the key that can be used to retrieve + /// and/or queue it. + pub(crate) fn create(&self, connector: Connector) -> ConnectorKey { + let lock = self.inner.write().unwrap(); + let index; - if self.free.is_empty() { + if lock.free.is_empty() { let connector = Box::into_raw(Box::new(connector)); unsafe { // Cheating a bit here. Anyway, move to heap, store in list - index = self.connectors.len(); - self.connectors.push(connector); + index = lock.connectors.len(); + lock.connectors.push(connector); } } else { - index = self.free.pop().unwrap(); + index = lock.free.pop().unwrap(); unsafe { - let target = self.connectors.get_mut(index); + let target = lock.connectors.get_mut(index); debug_assert!(!target.is_null()); ptr::write(*target, connector); } @@ -54,79 +85,170 @@ impl ConnectorStore { return ConnectorKey{ index: index as u32 }; } - fn destroy(&mut self, key: ConnectorKey) { + pub(crate) fn destroy(&self, key: ConnectorKey) { + let lock = self.inner.write().unwrap(); + unsafe { - let connector = self.connectors.get_mut(key.index as usize); + let connector = lock.connectors.get_mut(key.index as usize); ptr::drop_in_place(*connector); // Note: but not deallocating! } - self.free.push(key.index as usize); + lock.free.push(key.index as usize); } } impl Drop for ConnectorStore { fn drop(&mut self) { - for idx in 0..self.connectors.len() { + let lock = self.inner.write().unwrap(); + + for idx in 0..lock.connectors.len() { unsafe { - let memory = *self.connectors.get_mut(idx); - let boxed = Box::from_raw(memory); // takes care of deallocation + let memory = *lock.connectors.get_mut(idx); + let _ = Box::from_raw(memory); // takes care of deallocation } } } } -/// Global store of connectors, ports and queues that are used by the sceduler -/// threads. The global store has the appearance of a thread-safe datatype, but -/// one needs to be careful using it. -/// -/// The intention of this data structure is to enforce the rules: -/// TODO: @docs -pub struct GlobalStore { - connector_queue: MpmcQueue, - connectors: RwLock, +/// The registry of all ports +pub struct PortStore { + inner: RwLock, } -impl GlobalStore { - pub fn new() -> Self { +struct PortStoreInner { + ports: RawVec, + free: Vec, +} + +impl PortStore { + fn with_capacity(capacity: usize) -> Self { Self{ - connector_queue: MpmcQueue::with_capacity(256), - connectors: RwLock::new(ConnectorStore::with_capacity(256)), + inner: RwLock::new(PortStoreInner{ + ports: RawVec::with_capacity(capacity), + free: Vec::with_capacity(capacity), + }), } } - // Taking connectors out of global queue + pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef { + let lock = self.inner.read().unwrap(); + debug_assert!(port_id.is_valid()); + + unsafe { + let port = lock.ports.get_mut(port_id.index as usize); + let port = &mut *port; + debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing - pub fn pop_key(&self) -> Option { - return self.connector_queue.pop_front(); + return PortRef{ lock, port }; + } } - pub fn push_key(&self, key: ConnectorKey) { - self.connector_queue.push_back(key); + pub(crate) fn create_channel(&self, creating_connector: Option) -> Channel { + let mut lock = self.inner.write().unwrap(); + + // Reserves a new port. Doesn't point it to its counterpart + fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: Option) -> u32 { + let index; + let ownership = if creating_connector.is_some() { PortOwnership::Owned } else { PortOwnership::Unowned }; + let connector_id = creating_connector.unwrap_or(0); + + if lock.free.is_empty() { + index = lock.ports.len() as u32; + lock.ports.push(Port{ + self_id: PortIdLocal::new(index), + peer_id: PortIdLocal::new_invalid(), + kind, + ownership, + owning_connector: connector_id, + peer_connector: connector_id + }); + } else { + index = lock.free.pop().unwrap() as u32; + let port = unsafe{ &mut *lock.ports.get_mut(index as usize) }; + + port.peer_id = PortIdLocal::new_invalid(); + port.kind = kind; + port.ownership = ownership; + port.owning_connector = connector_id; + port.peer_connector = connector_id; + } + + return index; + } + + // Create the ports + let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector); + let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector); + debug_assert_ne!(putter_id, getter_id); + + // Point them to one another + unsafe { + let putter_port = &mut *lock.ports.get_mut(putter_id as usize); + let getter_port = &mut *lock.ports.get_mut(getter_id as usize); + putter_port.peer_id = getter_port.self_id; + getter_port.peer_id = putter_port.self_id; + } + + return Channel{ putter_id, getter_id } } +} + +pub struct PortRef<'p> { + lock: RwLockReadGuard<'p, PortStoreInner>, + port: &'static mut Port, +} - // Creating, retrieving and destroying connectors +impl<'p> std::ops::Deref for PortRef<'p> { + type Target = Port; - /// Retrieves a connector using the provided key. Note that the returned - /// reference is not truly static, the `GlobalStore` needs to stay alive. - pub fn get_connector(&self, key: &ConnectorKey) -> &'static mut Connector { - let connectors = self.connectors.read().unwrap(); - return connectors.get_mut(key); + fn deref(&self) -> &Self::Target { + return self.port; } +} - /// Adds a connector to the global system. Will also queue it to run - pub fn add_connector(&self, connector: Connector) { - let key = { - let mut connectors = self.connectors.write().unwrap(); - connectors.create(connector) - }; +impl<'p> std::ops::DerefMut for PortRef<'p> { + fn deref_mut(&mut self) -> &mut Self::Target { + return self.port; + } +} + +impl Drop for PortStore { + fn drop(&mut self) { + let lock = self.inner.write().unwrap(); + + // Very lazy code + for idx in 0..lock.ports.len() { + if lock.free.contains(&idx) { + continue; + } - self.connector_queue.push_back(key); + unsafe { + let port = lock.ports.get_mut(idx); + std::ptr::drop_in_place(port); + } + } } +} + +/// Global store of connectors, ports and queues that are used by the sceduler +/// threads. The global store has the appearance of a thread-safe datatype, but +/// one needs to be careful using it. +/// +/// TODO: @docs +/// TODO: @Optimize, very lazy implementation of concurrent datastructures. +pub struct GlobalStore { + pub connector_queue: MpmcQueue, + pub connectors: ConnectorStore, + pub ports: PortStore, +} - /// Destroys a connector - pub fn destroy_connector(&self, key: ConnectorKey) { - let mut connectors = self.connectors.write().unwrap(); - connectors.destroy(key); +impl GlobalStore { + pub fn new() -> Self { + Self{ + connector_queue: MpmcQueue::with_capacity(256), + connectors: ConnectorStore::with_capacity(256), + ports: PortStore::with_capacity(256), + } } } \ No newline at end of file diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 871ec8720c7256d6479bef3eb8c4bd7eb7838ada..d2d89a42df942e5275d196c058fc5b80cc2ddc00 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -1,73 +1,120 @@ -use crate::common::Ordering; +use std::collections::VecDeque; +use std::sync::{RwLock, RwLockReadGuard, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; + use crate::protocol::eval::ValueGroup; use crate::runtime2::connector::{BranchId, PortIdLocal}; -/// A message in transit from one connector to another. +/// A message prepared by a connector. Waiting to be picked up by the runtime to +/// be sent to another connector. #[derive(Clone)] -pub struct Message { +pub struct OutboxMessage { pub sending_port: PortIdLocal, - pub receiving_port: PortIdLocal, pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id pub sender_cur_branch_id: BranchId, // always valid pub message: ValueGroup, } +/// A message inserted into the inbox of a connector by the runtime. +#[derive(Clone)] +pub struct InboxMessage { + pub sending_port: PortIdLocal, + pub receiving_port: PortIdLocal, + pub sender_prev_branch_id: BranchId, + pub sender_cur_branch_id: BranchId, + pub message: ValueGroup, +} + +/// A message sent between connectors to communicate something about their +/// scheduling state. +pub enum ControlMessage { + ChangePortPeer(u32, PortIdLocal, u32), // (control message ID, port to change, new peer connector ID) + Ack(u32), // (control message ID) +} + /// The inbox of a connector. The owning connector (i.e. the thread that is /// executing the connector) should be able to read all messages. Other /// connectors (potentially executed by different threads) should be able to /// append messages. /// -/// Note that the logic inside of the inbox is strongly connected to deciding -/// whether or not a connector has nothing to execute, and is waiting on new -/// messages in order to continue. +/// If a connector has no more code to run, and its inbox does not contain any +/// new messages, then it may go into sleep mode. +/// +// TODO: @Optimize, this is a temporary lazy implementation pub struct Inbox { - messages: Vec + // "Normal" messages, intended for a PDL protocol. These need to stick + // around during an entire sync-block (to handle `put`s for which the + // corresponding `get`s have not yet been reached). + messages: RwLock>, + len_read: AtomicUsize, + // System messages. These are handled by the scheduler and only need to be + // handled once. + system_messages: Mutex>, } impl Inbox { pub fn new() -> Self { - Self{ messages: Vec::new() } + Self{ + messages: RwLock::new(Vec::new()), + len_read: AtomicUsize::new(0), + system_messages: Mutex::new(VecDeque::new()), + } } /// Will insert the message into the inbox. Only exception is when the tuple /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then /// nothing is inserted.. - pub fn insert_message(&mut self, message: Message) { - match self.messages.binary_search_by(|a| Self::compare_messages(a, &message)) { - Ok(_) => {} // message already exists - Err(idx) => self.messages.insert(idx, message) + pub fn insert_message(&self, message: InboxMessage) { + let mut messages = self.messages.write().unwrap(); + for existing in messages.iter() { + if existing.sender_prev_branch_id == message.sender_prev_branch_id && + existing.sender_cur_branch_id == message.sender_cur_branch_id && + existing.receiving_port == message.receiving_port { + // Message was already received + return; + } } + messages.push(message); } - /// Retrieves all messages for the provided conditions - pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> &[Message] { - // Seek the first message with the appropriate port ID and branch ID - let num_messages = self.messages.len(); - - for first_idx in 0..num_messages { - let msg = &self.messages[first_idx]; - if msg.receiving_port == port_id && msg.sender_prev_branch_id == prev_branch_id { - // Found a match, seek ahead until the condition is no longer true - let mut last_idx = first_idx + 1; - while last_idx < num_messages { - let msg = &self.messages[last_idx]; - if msg.receiving_port != port_id || msg.sender_prev_branch_id != prev_branch_id { - // No longer matching - break; - } - last_idx += 1; - } - - // Return all the matching messages - return &self.messages[first_idx..last_idx]; - } else if msg.receiving_port.id > port_id.id { - // Because messages are ordered, this implies we couldn't find - // any message - break; - } + /// Retrieves all previously read messages that satisfy the provided + /// speculative conditions. Note that the inbox remains read-locked until + /// the returned iterator is dropped. Should only be called by the + /// inbox-reader (i.e. the thread executing a connector's PDL code). + /// + /// This function should only be used to check if already-received messages + /// could be received by a newly encountered `get` call in a connector's + /// PDL code. + pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter { + let lock = self.messages.read().unwrap(); + return InboxMessageIter{ + lock, + next_index: 0, + max_index: self.len_read.load(Ordering::Acquire), + match_port_id: port_id, + match_prev_branch_id: prev_branch_id, + }; + } + + /// Retrieves the next unread message. Should only be called by the + /// inbox-reader. + pub fn next_message(&self) -> Option { + let lock = self.messages.read().unwrap(); + let cur_index = self.len_read.load(Ordering::Acquire); + if cur_index >= lock.len() { + return None; + } + + // TODO: Accept the correctness and simply make it an add, or even + // remove the atomic altogether. + if let Err(_) = self.len_read.compare_exchange(cur_index, cur_index + 1, Ordering::AcqRel, Ordering::Acquire) { + panic!("multiple readers modifying number of messages read"); } - return &self.messages[0..0]; + return Some(InboxMessageRef{ + lock, + index: cur_index, + }); } /// Simply empties the inbox @@ -75,15 +122,61 @@ impl Inbox { self.messages.clear(); } - // Ordering by, consecutively, a) receiving port, b) prev branch id, c) cur - // branch id. - fn compare_messages(a: &Message, b: &Message) -> Ordering { - let mut ord = a.receiving_port.id.cmp(&b.receiving_port.id); - if ord != Ordering::Equal { return ord; } + pub fn insert_control_message(&self, message: ControlMessage) { + let mut lock = self.system_messages.lock().unwrap(); + lock.push_back(message); + } - ord = a.sender_prev_branch_id.index.cmp(&b.sender_prev_branch_id.index); - if ord != Ordering::Equal { return ord; } + pub fn take_control_message(&self) -> Option { + let mut lock = self.system_messages.lock().unwrap(); + return lock.pop_front(); + } +} + +/// Reference to a new message +pub struct InboxMessageRef<'i> { + lock: RwLockReadGuard<'i, Vec>, + index: usize, +} - return a.sender_cur_branch_id.index.cmp(&b.sender_cur_branch_id.index); +impl<'i> std::ops::Deref for InboxMessageRef<'i> { + type Target = InboxMessage; + + fn deref(&self) -> &'i Self::Target { + return &self.lock[self.index]; } } + +/// Iterator over previously received messages in the inbox. +pub struct InboxMessageIter<'i> { + lock: RwLockReadGuard<'i, Vec>, + next_index: usize, + max_index: usize, + match_port_id: PortIdLocal, + match_prev_branch_id: BranchId, +} + +impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> { + type Item = &'m InboxMessage; + + fn next(&'m mut self) -> Option { + // Loop until match is found or at end of messages + while self.next_index < self.max_index { + let cur_message = &self.lock[self.next_index]; + if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id { + // Found a match + break; + } + + self.next_index += 1; + } + + if self.next_index == self.max_index { + return None; + } + + let message = &self.lock[self.next_index]; + self.next_index += 1; + return Some(message); + } +} \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 87580e13e35739b790cbf350113887171b757b3f..296c8726b8d9714d8bac4dd0869912874ffd96a3 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -1,6 +1,7 @@ mod runtime; mod messages; mod connector; +mod port; mod global_store; mod scheduler; diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs new file mode 100644 index 0000000000000000000000000000000000000000..df7a7d64c3d949f8f3bbd24aa610e51b9faa170b --- /dev/null +++ b/src/runtime2/port.rs @@ -0,0 +1,53 @@ +#[derive(Clone, Copy, PartialEq, Eq)] +pub(crate) struct PortIdLocal { + pub index: u32, +} + +impl PortIdLocal { + pub fn new(id: u32) -> Self { + Self{ index: id } + } + + // TODO: Unsure about this, maybe remove, then also remove all struct + // instances where I call this + pub fn new_invalid() -> Self { + Self{ index: u32::MAX } + } + + pub fn is_valid(&self) -> bool { + return self.index != u32::MAX; + } +} + +pub enum PortKind { + Putter, + Getter, +} + +pub enum PortOwnership { + Unowned, // i.e. held by a native application + Owned, + InTransit, +} + +/// Represents a port inside of the runtime. May be without owner if it is +/// created by the application interfacing with the runtime, instead of being +/// created by a connector. +pub struct Port { + // Once created, these values are immutable + pub self_id: PortIdLocal, + pub peer_id: PortIdLocal, + pub kind: PortKind, + // But this can be changed, but only by the connector that owns it + pub ownership: PortOwnership, + pub owning_connector: u32, + pub peer_connector: u32, // might be temporarily inconsistent while peer port is sent around in non-sync phase. +} + + + +// TODO: Turn port ID into its own type +pub struct Channel { + pub putter_id: u32, // can put on it, so from the connector's point of view, this is an output + pub getter_id: u32, // vice versa: can get on it, so an input for the connector +} \ No newline at end of file diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index b772b87602a537819130076dfe9751d213c33a2f..12899dc5c6bb601ef88d144ca083df520971ada3 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -3,6 +3,7 @@ use std::time::Duration; use std::thread; use crate::ProtocolDescription; +use super::inbox::InboxMessage; use super::connector::{Connector, ConnectorScheduling, RunDeltaState}; use super::global_store::GlobalStore; @@ -23,13 +24,13 @@ impl Scheduler { // Setup global storage and workspaces that are reused for every // connector that we run // TODO: @Memory, scheme for reducing allocations if excessive. - let mut delta_state = RunDeltaState::new() + let mut delta_state = RunDeltaState::new(); loop { // TODO: Check if we're supposed to exit // Retrieve a unit of work - let connector_key = self.global.pop_key(); + let connector_key = self.global.connector_queue.pop_front(); if connector_key.is_none() { // TODO: @Performance, needs condition variable for waking up thread::sleep(Duration::new(1, 0)); @@ -38,13 +39,15 @@ impl Scheduler { // We have something to do let connector_key = connector_key.unwrap(); - let connector = self.global.get_connector(&connector_key); + let connector = self.global.connectors.get_mut(&connector_key); let mut cur_schedule = ConnectorScheduling::Immediate; while cur_schedule == ConnectorScheduling::Immediate { let new_schedule; + // TODO: Check inbox for new message + if connector.is_in_sync_mode() { // In synchronous mode, so we can expect messages being sent, // but we never expect the creation of connectors @@ -52,8 +55,27 @@ impl Scheduler { debug_assert!(delta_state.new_connectors.is_empty()); if !delta_state.outbox.is_empty() { + // There are message to send for message in delta_state.outbox.drain(..) { + let (inbox_message, target_connector_id) = { + // Note: retrieving a port incurs a read lock + let sending_port = self.global.ports.get(&connector_key, message.sending_port); + ( + InboxMessage { + sending_port: sending_port.self_id, + receiving_port: sending_port.peer_id, + sender_prev_branch_id: message.sender_prev_branch_id, + sender_cur_branch_id: message.sender_cur_branch_id, + message: message.message, + }, + sending_port.peer_connector, + ) + }; + + let target_connector = self.global.connectors.get_shared(target_connector_id); + target_connector.inbox.insert_message(inbox_message); + // TODO: Check silent state. Queue connector if it was silent } } } else { @@ -65,7 +87,13 @@ impl Scheduler { if !delta_state.new_connectors.is_empty() { // Push all connectors into the global state and queue them // for execution - + for connector in delta_state.new_connectors.drain(..) { + // Create connector, modify all of the ports that + // it now owns, then queue it for execution + let connector_key = self.global.connectors.create(connector); + + self.global.connector_queue.push_back(connector_key); + } } }