diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index fffbad2d7b3a76febb3067a58d4e0c31173c2fe1..916e7cd1c78c2580cd51044c2041e7d67c88745c 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -4,7 +4,7 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::scheduler::Scheduler; +use crate::runtime2::scheduler::{ComponentCtxFancy, Scheduler}; use super::ConnectorId; use super::native::Connector; @@ -410,11 +410,11 @@ impl Connector for ConnectorPDL { } } - fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { // Check for new messages we haven't seen before. If any of the // pending branches can accept the message, do so. - while let Some((target_port_id, message)) = self.inbox.next_message() { + while let Some((target_port_id, message)) = comp_ctx.read_next_message() { let mut branch_idx = self.sync_pending_get.first; while branch_idx != 0 { let branch = &self.branches[branch_idx as usize]; @@ -444,7 +444,7 @@ impl Connector for ConnectorPDL { } } - let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state); + let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx, conn_ctx, delta_state); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. @@ -493,7 +493,7 @@ impl Connector for ConnectorPDL { return scheduling; } else { - let scheduling = self.run_in_deterministic_mode(sched_ctx, conn_ctx, delta_state); + let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx, conn_ctx, delta_state); return scheduling; } } @@ -527,7 +527,7 @@ impl ConnectorPDL { // ------------------------------------------------------------------------- pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) { - self.inbox.insert_message(target_port, message); + // self.inbox.insert_message(target_port, message); } /// Accepts a synchronous message and combines it with the locally stored @@ -764,7 +764,7 @@ impl ConnectorPDL { /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); if self.sync_active.is_empty() { @@ -864,7 +864,7 @@ impl ConnectorPDL { // But if some messages can be immediately applied, do so // now. - let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id); + let messages = comp_ctx.get_read_messages(local_port_id, port_mapping.last_registered_branch_id); let mut did_have_messages = false; for message in messages { @@ -986,7 +986,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index cf25163fb61249b33639cc6d0d24bb2acba8e214..f91ec36c5879a3eab121b373a0bfa3acd8a0dd7e 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -5,6 +5,7 @@ use std::sync::atomic::Ordering; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; +use crate::runtime2::scheduler::ComponentCtxFancy; use super::{ConnectorKey, ConnectorId, RuntimeInner, ConnectorCtx}; use super::scheduler::SchedulerCtx; @@ -21,7 +22,7 @@ pub(crate) trait Connector { fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); /// Should run the connector's behaviour up until the next blocking point. - fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; @@ -66,7 +67,7 @@ impl Connector for ConnectorApplication { } } - fn run(&mut self, _sched_ctx: SchedulerCtx, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy, _conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index 42b9ee27cfa77e70e25dc084aa8a382aeeefd9f3..b94443dc568a7606001e3f6477046d188e499d05 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -37,6 +37,7 @@ pub enum PortState { /// a connector on its port, which may not be consistent with the rest of the /// global system (e.g. its peer was moved to a new connector, or the peer might /// have died in the meantime, so it is no longer usable). +#[derive(Clone)] pub struct Port { pub self_id: PortIdLocal, pub peer_id: PortIdLocal, diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index be781d33c4b90d74713c508c50d9e396d52ca4eb..1c31694e3d45b6214a10052fbd1104db80d1058b 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::runtime2::ScheduledConnector; +use crate::runtime2::connector::{BranchId, ConnectorPDL}; +use crate::runtime2::inbox::{DataMessage, PrivateInbox}; -use super::{RuntimeInner, ConnectorId, ConnectorKey}; +use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::connector::{ConnectorScheduling, RunDeltaState}; @@ -366,6 +367,159 @@ impl Scheduler { } } +// ----------------------------------------------------------------------------- +// ComponentCtx +// ----------------------------------------------------------------------------- + +enum ComponentStateChange { + CreatedComponent(ConnectorPDL), + CreatedPort(Port), + ChangedPort(ComponentPortChange), +} + +#[derive(Clone)] +pub(crate) enum ComponentPortChange { + Acquired(Port), + Released(Port), +} + +struct InboxMessage { + target_port: PortIdLocal, + data: DataMessage, +} + +/// The component context (better name may be invented). This was created +/// because part of the component's state is managed by the scheduler, and part +/// of it by the component itself. When the component starts a sync block or +/// exits a sync block the partially managed state by both component and +/// scheduler need to be exchanged. +pub(crate) struct ComponentCtxFancy { + // Mostly managed by the scheduler + id: ConnectorId, + ports: Vec, + inbox_messages: Vec, + inbox_len_read: usize, + // Submitted by the component + is_in_sync: bool, + changed_in_sync: bool, + outbox: Vec, + state_changes: Vec +} + +impl ComponentCtxFancy { + /// Notify the runtime that the component has created a new component. May + /// only be called outside of a sync block. + pub(crate) fn push_component(&mut self, component: ConnectorPDL) { + debug_assert!(!self.is_in_sync); + self.state_changes.push(ComponentStateChange::CreatedComponent(component)); + } + + /// Notify the runtime that the component has created a new port. May only + /// be called outside of a sync block (for ports received during a sync + /// block, pass them when calling `notify_sync_end`). + pub(crate) fn push_port(&mut self, port: Port) { + debug_assert!(!self.is_in_sync); + self.state_changes.push(ComponentStateChange::CreatedPort(port)) + } + + /// Notify that component will enter a sync block. + pub(crate) fn notify_sync_start(&mut self) -> &[Port] { + debug_assert!(!self.is_in_sync); + + self.is_in_sync = true; + self.changed_in_sync = true; + return &self.ports + } + + /// Submit a message for the scheduler to send to the appropriate receiver. + /// May only be called inside of a sync block. + pub(crate) fn submit_message(&mut self, contents: MessageContents) { + debug_assert!(self.is_in_sync); + self.outbox.push(contents); + } + + /// Notify that component just finished a sync block. + pub(crate) fn notify_sync_end(&mut self, changed_ports: &[ComponentPortChange]) { + debug_assert!(self.is_in_sync); + + self.is_in_sync = false; + self.changed_in_sync = true; + + self.state_changes.reserve(changed_ports.len()); + for changed_port in changed_ports { + self.state_changes.push(ComponentStateChange::ChangedPort(changed_port.clone())); + } + } + + /// Inserts message into inbox. Generally only called by scheduler. + pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, data: DataMessage) { + debug_assert!(!self.inbox_messages.iter().any(|v| { + v.target_port == target_port && + v.data.sender_prev_branch_id == data.sender_prev_branch_id && + v.data.sender_cur_branch_id == data.sender_cur_branch_id + })); + + self.inbox_messages.push(InboxMessage{ target_port, data }) + } + + /// Retrieves messages matching a particular port and branch id. But only + /// those messages that have been previously received with + /// `read_next_message`. + pub(crate) fn get_read_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter { + return MessageIter { + messages: &self.inbox_messages, + next_index: 0, + max_index: self.inbox_len_read, + match_port_id, match_prev_branch_id + }; + } + + /// Retrieves the next unread message from the inbox `None` if there are no + /// (new) messages to read. + pub(crate) fn read_next_message(&mut self) -> Option<(&PortIdLocal, &DataMessage)> { + if self.inbox_len_read == self.inbox_messages.len() { + return None; + } + + let message = &self.inbox_messages[self.inbox_len_read]; + self.inbox_len_read += 1; + return Some((&message.target_port, &message.data)) + } +} + +pub(crate) struct MessagesIter<'a> { + messages: &'a [InboxMessage], + next_index: usize, + max_index: usize, + match_port_id: PortIdLocal, + match_prev_branch_id: BranchId, +} + +impl Iterator for MessagesIter { + type Item = DataMessage; + + fn next(&mut self) -> Option<&Self::Item> { + // Loop until match is found or at end of messages + while self.next_index < self.max_index { + let message = &self.messages[self.next_index]; + if message.target_port == self.match_port_id && message.data.sender_prev_branch_id == self.match_prev_branch_id { + // Found a match + break; + } + + self.next_index += 1; + } + + if self.next_index == self.max_index { + return None; + } + + let message = &self.messages[self.next_index]; + self.next_index += 1; + return Some(&message.data); + } +} + // ----------------------------------------------------------------------------- // Control messages // -----------------------------------------------------------------------------