diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e822bdff686170be618cdaab571777e698bd80b6..911674012bf95991b546a585dc61ef9570afbc16 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; use crate::protocol::eval::EvalError; +use crate::runtime2::port::ChannelId; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; @@ -134,7 +135,7 @@ impl Scheduler { while let Some(message) = scheduled.public.inbox.take_message() { // Check if the message has to be rerouted because we have moved the // target port to another component. - self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message)); + self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message)); if let Some(target_port) = Self::get_message_target_port(&message) { if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { self.debug_conn(connector_id, " ... Rerouting the message"); @@ -206,7 +207,7 @@ impl Scheduler { // Handling any messages that were sent while let Some(message) = scheduled.ctx.outbox.pop_front() { - self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); + self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message)); let target_component_id = match &message { Message::Data(content) => { @@ -221,13 +222,22 @@ impl Scheduler { port_desc.peer_connector }, - Message::Sync(content) => { + Message::SyncComp(content) => { // Sync messages are always sent to a particular component, // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). content.target_component_id }, + Message::SyncPort(content) => { + let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap(); + debug_assert_eq!(port_desc.peer_id, content.target_port); + if port_desc.state == PortState::Closed { + todo!("handle sending over a closed port") + } + + port_desc.peer_connector + } Message::Control(_) => { unreachable!("component sending control messages directly"); } @@ -339,7 +349,8 @@ impl Scheduler { fn get_message_target_port(message: &Message) -> Option { match message { Message::Data(data) => return Some(data.data_header.target_port), - Message::Sync(_) => {}, + Message::SyncComp(_) => {}, + Message::SyncPort(content) => return Some(content.target_port), Message::Control(control) => { match &control.content { ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), @@ -354,11 +365,11 @@ impl Scheduler { // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { - // println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { - // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } @@ -450,6 +461,10 @@ impl ComponentCtx { return self.ports.iter().find(|v| v.self_id == id); } + pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> { + return self.ports.iter().find(|v| v.channel_id == id); + } + fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> { return self.ports.iter_mut().find(|v| v.self_id == id); } @@ -528,10 +543,14 @@ impl ComponentCtx { self.inbox_len_read += 1; return Some(Message::Data(content.clone())); }, - Message::Sync(_) => { + Message::SyncComp(_) => { let message = self.inbox_messages.remove(self.inbox_len_read); return Some(message); }, + Message::SyncPort(_) => { + let message = self.inbox_messages.remove(self.inbox_len_read); + return Some(message); + } Message::Control(_) => unreachable!("control message ended up in component inbox"), } }