diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index fd5a8c1f4a1ae39e5134f103a1bcb423e059a189..a98334a747a63370f502708fc830e019e335b43c 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -22,9 +22,10 @@ use crate::ProtocolDescription; use inbox::Message; use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; -use scheduler::{Scheduler, ConnectorCtx, Router}; +use scheduler::{Scheduler, ConnectorCtx, ControlMessageHandler}; use native::{Connector, ConnectorApplication, ApplicationInterface}; use crate::runtime2::port::Port; +use crate::runtime2::scheduler::SchedulerCtx; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this @@ -84,10 +85,10 @@ impl Connector for ConnectorVariant { } } - fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, scheduler_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { match self { - ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state), - ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state), + ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, conn_ctx, delta_state), + ConnectorVariant::Native(c) => c.run(scheduler_ctx, conn_ctx, delta_state), } } } @@ -96,7 +97,9 @@ pub(crate) struct ScheduledConnector { pub connector: ConnectorVariant, // access by connector pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector pub public: ConnectorPublic, // accessible by all schedulers and connectors - pub router: Router, + pub router: ControlMessageHandler, + pub shutting_down: bool, + pub pending_acks: u32, } // ----------------------------------------------------------------------------- @@ -214,11 +217,11 @@ impl RuntimeInner { self.scheduler_notifier.notify_one(); } - // --- Creating ports + // --- Creating/using ports /// Creates a new port pair. Note that these are stored globally like the /// connectors are. Ports stored by components belong to those components. - pub(crate) fn create_channel(&self) -> (Port, Port) { + pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> (Port, Port) { use port::{PortIdLocal, PortKind}; let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); @@ -229,18 +232,34 @@ impl RuntimeInner { self_id: getter_id, peer_id: putter_id, kind: PortKind::Getter, - peer_connector: self.connector_id, + peer_connector: creating_connector, }; let putter_port = Port{ self_id: putter_id, peer_id: getter_id, kind: PortKind::Putter, - peer_connector: self.connector_id, + peer_connector: creating_connector, }; return (getter_port, putter_port); } + /// Sends a message to a particular connector. If the connector happened to + /// be sleeping then it will be scheduled for execution. + pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) { + let target = self.get_component_public(target_id); + target.inbox.insert_message(message); + + let should_wake_up = target.sleeping + .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) + .is_ok(); + + if should_wake_up { + let key = unsafe{ ConnectorKey::from_id(target_id) }; + self.push_work(key); + } + } + // --- Creating/retrieving/destroying components pub(crate) fn create_interface_component(&self, component: ConnectorApplication) -> ConnectorKey { @@ -259,7 +278,7 @@ impl RuntimeInner { // Create as not sleeping, as we'll schedule it immediately let key = { let mut lock = self.connectors.write().unwrap(); - lock.create(ConnectorVariant::UserDefined(connector), true) + lock.create(ConnectorVariant::UserDefined(connector), false) }; // Transfer the ports @@ -283,11 +302,13 @@ impl RuntimeInner { return key; } + #[inline] pub(crate) fn get_component_private(&self, connector_key: &ConnectorKey) -> &'static mut ScheduledConnector { let lock = self.connectors.read().unwrap(); return lock.get_private(connector_key); } + #[inline] pub(crate) fn get_component_public(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { let lock = self.connectors.read().unwrap(); return lock.get_public(connector_id); @@ -406,7 +427,9 @@ impl ConnectorStore { connector, context: ConnectorCtx::new(), public: ConnectorPublic::new(initially_sleeping), - router: Router::new(), + router: ControlMessageHandler::new(), + shutting_down: false, + pending_acks: 0, }; let index;