diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 60626e93624e74d23fdd3ea1d7cc795385b9295f..9ed1ce2cf657e3b23eb5de8932b9253a740d1f03 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -3,12 +3,12 @@ use std::sync::Arc; use std::sync::atomic::Ordering; use crate::runtime2::inbox2::ControlContent; -use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant}; +use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::branch::{BranchId}; use super::connector2::{ConnectorPDL, ConnectorScheduling}; -use super::inbox2::{MessageFancy, DataMessageFancy, SyncMessageFancy, ControlMessageFancy}; +use super::inbox2::{MessageFancy, DataMessageFancy, ControlMessageFancy}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -127,17 +127,20 @@ impl Scheduler { let connector_id = scheduled.ctx_fancy.id; while let Some(message) = scheduled.public.inbox.take_message() { - // Check for rerouting - self.debug_conn(connector_id, &format!("Handling message from conn({}) at port({})\n --- {:?}", message.sending_connector.0, message.receiving_port.index, message)); - if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { - self.debug_conn(connector_id, &format!(" ... Rerouting to connector {}", other_connector_id.0)); - self.runtime.send_message(other_connector_id, message); - continue; + // Check if the message has to be rerouted because we have moved the + // target port to another component. + self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message)); + if let Some(target_port) = Self::get_data_message_target_port(&message) { + if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { + self.debug_conn(connector_id, " ... Rerouting the message"); + self.runtime.send_message(other_component_id, message); + continue; + } } - // Handle special messages here, messages for the component - // will be added to the inbox. - self.debug_conn(connector_id, " ... Handling message myself"); + // If here, then we should handle the message + self.debug_conn(connector_id, " ... Handling the message"); + match message { MessageFancy::Control(message) => { match message.content { @@ -197,7 +200,7 @@ impl Scheduler { let connector_id = scheduled.ctx_fancy.id; // Handling any messages that were sent - while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() { + while let Some(message) = scheduled.ctx_fancy.outbox.pop_front() { self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); let target_component_id = match &message { @@ -231,20 +234,22 @@ impl Scheduler { while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() { match state_change { ComponentStateChange::CreatedComponent(component, initial_ports) => { - // Add the new connector to the global registry + // Creating a new component. The creator needs to relinquish + // ownership of the ports that are given to the new + // component. All data messages that were intended for that + // port also needs to be transferred. let new_key = self.runtime.create_pdl_component(component, false); let new_connector = self.runtime.get_component_private(&new_key); - // Transfer ports for port_id in initial_ports { // Transfer messages associated with the transferred port let mut message_idx = 0; while message_idx < scheduled.ctx_fancy.inbox_messages.len() { let message = &scheduled.ctx_fancy.inbox_messages[message_idx]; - if message.receiving_port == *port_id { + if Self::get_data_message_target_port(message) == Some(port_id) { // Need to transfer this message - let taken_message = scheduled.ctx_fancy.inbox_messages.remove(message_idx); - new_connector.ctx_fancy.inbox_messages.push(taken_message); + let message = scheduled.ctx_fancy.inbox_messages.remove(message_idx); + new_connector.ctx_fancy.inbox_messages.push(message); } else { message_idx += 1; } @@ -252,7 +257,7 @@ impl Scheduler { // Transfer the port itself let port_index = scheduled.ctx_fancy.ports.iter() - .position(|v| v.self_id == *port_id) + .position(|v| v.self_id == port_id) .unwrap(); let port = scheduled.ctx_fancy.ports.remove(port_index); new_connector.ctx_fancy.ports.push(port.clone()); @@ -324,6 +329,15 @@ impl Scheduler { } } + #[inline] + fn get_data_message_target_port(message: &MessageFancy) -> Option { + if let MessageFancy::Data(message) = message { + return Some(message.data_header.target_port) + } + + return None + } + // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); @@ -404,6 +418,11 @@ impl ComponentCtxFancy { self.state_changes.push_back(ComponentStateChange::CreatedPort(port)) } + #[inline] + pub(crate) fn get_ports(&self) -> &[Port] { + return self.ports.as_slice(); + } + pub(crate) fn get_port_by_id(&self, id: PortIdLocal) -> Option<&Port> { return self.ports.iter().find(|v| v.self_id == id); } @@ -413,15 +432,14 @@ impl ComponentCtxFancy { } /// Notify that component will enter a sync block. Note that after calling - /// this function you must allow the scheduler to pick up the changes in - /// the context by exiting your `Component::run` function with an - /// appropriate scheduling value. - pub(crate) fn notify_sync_start(&mut self) -> &[Port] { + /// this function you must allow the scheduler to pick up the changes in the + /// context by exiting your code-executing loop, and to continue executing + /// code the next time the scheduler picks up the component. + pub(crate) fn notify_sync_start(&mut self) { debug_assert!(!self.is_in_sync); self.is_in_sync = true; self.changed_in_sync = true; - return &self.ports } #[inline] @@ -614,7 +632,7 @@ impl ControlMessageHandler { /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. - pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { + pub fn should_reroute(&self, target_port: PortIdLocal) -> Option { for entry in &self.active { if let ControlVariant::ChangedPort(entry) = &entry.variant { if entry.target_port == target_port {