diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 22299972b63d32556f9f855b82fd5f9e73066e32..e822bdff686170be618cdaab571777e698bd80b6 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::protocol::eval::EvalError; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; @@ -393,6 +394,7 @@ pub(crate) struct ComponentCtx { changed_in_sync: bool, outbox: VecDeque, state_changes: VecDeque, + // Workspaces that may be used by components to (generally) prevent // allocations. Be a good scout and leave it empty after you've used it. // TODO: Move to scheduler ctx, this is the wrong place @@ -431,6 +433,14 @@ impl ComponentCtx { self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) } + /// Notify the runtime of an error. Note that this will not perform any + /// special action beyond printing the error. The component is responsible + /// for waiting until it is appropriate to shut down (i.e. being outside + /// of a sync region) and returning the `Exit` scheduling code. + pub(crate) fn push_error(&mut self, error: EvalError) { + + } + #[inline] pub(crate) fn get_ports(&self) -> &[Port] { return self.ports.as_slice(); @@ -462,9 +472,17 @@ impl ComponentCtx { /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. - pub(crate) fn submit_message(&mut self, contents: Message) { + pub(crate) fn submit_message(&mut self, contents: Message) -> Result<(), ()> { debug_assert!(self.is_in_sync); + if let Some(port_id) = contents.source_port() { + if self.get_port_by_id(port_id).is_none() { + // We don't own the port + return Err(()); + } + } + self.outbox.push_back(contents); + return Ok(()); } /// Notify that component just finished a sync block. Like