diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index be781d33c4b90d74713c508c50d9e396d52ca4eb..1c31694e3d45b6214a10052fbd1104db80d1058b 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::runtime2::ScheduledConnector; +use crate::runtime2::connector::{BranchId, ConnectorPDL}; +use crate::runtime2::inbox::{DataMessage, PrivateInbox}; -use super::{RuntimeInner, ConnectorId, ConnectorKey}; +use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::connector::{ConnectorScheduling, RunDeltaState}; @@ -366,6 +367,159 @@ impl Scheduler { } } +// ----------------------------------------------------------------------------- +// ComponentCtx +// ----------------------------------------------------------------------------- + +enum ComponentStateChange { + CreatedComponent(ConnectorPDL), + CreatedPort(Port), + ChangedPort(ComponentPortChange), +} + +#[derive(Clone)] +pub(crate) enum ComponentPortChange { + Acquired(Port), + Released(Port), +} + +struct InboxMessage { + target_port: PortIdLocal, + data: DataMessage, +} + +/// The component context (better name may be invented). This was created +/// because part of the component's state is managed by the scheduler, and part +/// of it by the component itself. When the component starts a sync block or +/// exits a sync block the partially managed state by both component and +/// scheduler need to be exchanged. +pub(crate) struct ComponentCtxFancy { + // Mostly managed by the scheduler + id: ConnectorId, + ports: Vec, + inbox_messages: Vec, + inbox_len_read: usize, + // Submitted by the component + is_in_sync: bool, + changed_in_sync: bool, + outbox: Vec, + state_changes: Vec +} + +impl ComponentCtxFancy { + /// Notify the runtime that the component has created a new component. May + /// only be called outside of a sync block. + pub(crate) fn push_component(&mut self, component: ConnectorPDL) { + debug_assert!(!self.is_in_sync); + self.state_changes.push(ComponentStateChange::CreatedComponent(component)); + } + + /// Notify the runtime that the component has created a new port. May only + /// be called outside of a sync block (for ports received during a sync + /// block, pass them when calling `notify_sync_end`). + pub(crate) fn push_port(&mut self, port: Port) { + debug_assert!(!self.is_in_sync); + self.state_changes.push(ComponentStateChange::CreatedPort(port)) + } + + /// Notify that component will enter a sync block. + pub(crate) fn notify_sync_start(&mut self) -> &[Port] { + debug_assert!(!self.is_in_sync); + + self.is_in_sync = true; + self.changed_in_sync = true; + return &self.ports + } + + /// Submit a message for the scheduler to send to the appropriate receiver. + /// May only be called inside of a sync block. + pub(crate) fn submit_message(&mut self, contents: MessageContents) { + debug_assert!(self.is_in_sync); + self.outbox.push(contents); + } + + /// Notify that component just finished a sync block. + pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) { + debug_assert!(self.is_in_sync); + + self.is_in_sync = false; + self.changed_in_sync = true; + + self.state_changes.reserve(changed_ports.len()); + for changed_port in changed_ports { + self.state_changes.push(ComponentStateChange::ChangedPort(changed_port.clone())); + } + } + + /// Inserts message into inbox. Generally only called by scheduler. + pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, data: DataMessage) { + debug_assert!(!self.inbox_messages.iter().any(|v| { + v.target_port == target_port && + v.data.sender_prev_branch_id == data.sender_prev_branch_id && + v.data.sender_cur_branch_id == data.sender_cur_branch_id + })); + + self.inbox_messages.push(InboxMessage{ target_port, data }) + } + + /// Retrieves messages matching a particular port and branch id. But only + /// those messages that have been previously received with + /// `read_next_message`. + pub(crate) fn get_read_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter { + return MessageIter { + messages: &self.inbox_messages, + next_index: 0, + max_index: self.inbox_len_read, + match_port_id, match_prev_branch_id + }; + } + + /// Retrieves the next unread message from the inbox `None` if there are no + /// (new) messages to read. + pub(crate) fn read_next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> { + if self.inbox_len_read == self.inbox_messages.len() { + return None; + } + + let message = &self.inbox_messages[self.inbox_len_read]; + self.inbox_len_read += 1; + return Some((&message.target_port, &message.data)) + } +} + +pub(crate) struct MessagesIter<'a> { + messages: &'a [InboxMessage], + next_index: usize, + max_index: usize, + match_port_id: PortIdLocal, + match_prev_branch_id: BranchId, +} + +impl Iterator for MessagesIter { + type Item = DataMessage; + + fn next(&mut self) -> Option<&Self::Item> { + // Loop until match is found or at end of messages + while self.next_index < self.max_index { + let message = &self.messages[self.next_index]; + if message.target_port == self.match_port_id && message.data.sender_prev_branch_id == self.match_prev_branch_id { + // Found a match + break; + } + + self.next_index += 1; + } + + if self.next_index == self.max_index { + return None; + } + + let message = &self.messages[self.next_index]; + self.next_index += 1; + return Some(&message.data); + } +} + // ----------------------------------------------------------------------------- // Control messages // -----------------------------------------------------------------------------