diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 19371b8bea7327ad1854c21af335f37ebd077341..50329ff5cfc43f4f43ca9f68633c9bdcfd98c638 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,16 +1,14 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy}; +use crate::runtime2::inbox2::ControlContent; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; -use super::connector::{BranchId, ConnectorPDL, ConnectorScheduling}; -use super::inbox::{ - Message, MessageContents, ControlMessageVariant, - DataMessage, ControlMessage, SolutionMessage, SyncMessage -}; +use super::branch::{BranchId}; +use super::connector2::{ConnectorPDL, ConnectorScheduling}; +use super::inbox2::{MessageFancy, DataMessageFancy, SyncMessageFancy, ControlMessageFancy}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -108,7 +106,7 @@ impl Scheduler { connector_id ); self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message)); - self.runtime.send_message(port.peer_connector, message); + self.runtime.send_message(port.peer_connector, MessageFancy::Control(message)); } } @@ -140,10 +138,10 @@ impl Scheduler { // Handle special messages here, messages for the component // will be added to the inbox. self.debug_conn(connector_id, " ... Handling message myself"); - match message.contents { - MessageContents::Control(content) => { - match content.content { - ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { + match message { + MessageFancy::Control(message) => { + match message.content { + ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { // Need to change port target let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); port.peer_connector = new_target_connector_id; @@ -155,43 +153,34 @@ impl Scheduler { debug_assert!(scheduled.ctx_fancy.outbox.is_empty()); // And respond with an Ack - let ack_message = Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - }; + let ack_message = MessageFancy::Control(ControlMessageFancy{ + id: content.id, + sending_component_id: connector_id, + content: ControlContent::Ack, + }); self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_connector, ack_message); + self.runtime.send_message(message.sending_component_id, ack_message); }, - ControlMessageVariant::CloseChannel(port_id) => { + ControlContent::CloseChannel(port_id) => { // Mark the port as being closed let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); port.state = PortState::Closed; // Send an Ack - let ack_message = Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - }; + let ack_message = MessageFancy::Control(ControlMessageFancy{ + id: content.id, + sending_component_id: connector_id, + content: ControlContent::Ack, + }); self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_connector, ack_message); + self.runtime.send_message(message.sending_component_id, ack_message); }, - ControlMessageVariant::Ack => { + ControlContent::Ack => { scheduled.router.handle_ack(content.id); - } + }, + ControlContent::Ping => {}, } }, - MessageContents::Ping => { - // Pings are sent just to wake up a component, so - // nothing to do here. - }, _ => { // All other cases have to be handled by the component scheduled.ctx_fancy.inbox_messages.push(message); @@ -203,87 +192,51 @@ impl Scheduler { /// Handles changes to the context that were made by the component. This is /// the way (due to Rust's borrowing rules) that we bubble up changes in the /// component's state that the scheduler needs to know about (e.g. a message - /// that the component wants to send). + /// that the component wants to send, a port that has been added). fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) { let connector_id = scheduled.ctx_fancy.id; // Handling any messages that were sent while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() { - // Based on the message contents, decide where the message - // should be sent to. This might end up modifying the message. self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); - let (peer_connector, self_port, peer_port) = match &mut message { - MessageContents::Data(contents) => { - let port = scheduled.ctx_fancy.get_port_by_id(contents.sending_port).unwrap(); - (port.peer_connector, contents.sending_port, port.peer_id) - }, - MessageContents::Sync(contents) => { - let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::RequestCommit(contents)=> { - let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::ConfirmCommit(contents) => { - for to_visit in &contents.to_visit { - let message = Message{ - sending_connector: scheduled.ctx_fancy.id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::ConfirmCommit(contents.clone()), - }; - self.runtime.send_message(*to_visit, message); + + let target_component_id = match &message { + MessageFancy::Data(content) => { + // Data messages are always sent to a particular port, and + // may end up being rerouted. + let port_desc = scheduled.ctx_fancy.get_port_by_id(content.data_header.sending_port).unwrap(); + debug_assert_eq!(port_desc.peer_id, content.data_header.target_port); + + if port_desc.state == PortState::Closed { + todo!("handle sending over a closed port") } - (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) + + port_desc.peer_connector }, - MessageContents::Control(_) | MessageContents::Ping => { - // Never generated by the user's code - unreachable!(); + MessageFancy::Sync(content) => { + // Sync messages are always sent to a particular component, + // the sender must make sure it actually wants to send to + // the specified component (and is not using an inconsistent + // component ID associated with a port). + content.sync_header.highest_component_id + }, + MessageFancy::Control(_) => { + unreachable!("component sending control messages directly"); } }; - // TODO: Maybe clean this up, perhaps special case for - // ConfirmCommit can be handled differently. - if peer_connector.is_valid() { - if peer_port.is_valid() { - // Sending a message to a port, so the port may not be - // closed. - let port = scheduled.ctx_fancy.get_port_by_id(self_port).unwrap(); - match port.state { - PortState::Open => {}, - PortState::Closed => { - todo!("Handling sending over a closed port"); - } - } - } - let message = Message { - sending_connector: scheduled.ctx_fancy.id, - receiving_port: peer_port, - contents: message, - }; - self.runtime.send_message(peer_connector, message); - } + self.runtime.send_message(target_component_id, message); } while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() { match state_change { - ComponentStateChange::CreatedComponent(component) => { + ComponentStateChange::CreatedComponent(component, initial_ports) => { // Add the new connector to the global registry let new_key = self.runtime.create_pdl_component(component, false); let new_connector = self.runtime.get_component_private(&new_key); // Transfer ports - // TODO: Clean this up the moment native components are somewhat - // properly implemented. We need to know about the ports that - // are "owned by the PDL code", and then make sure that the - // context contains a description of those ports. - let ports = if let ConnectorVariant::UserDefined(connector) = &new_connector.connector { - &connector.ports.owned_ports - } else { - unreachable!(); - }; - - for port_id in ports { + for port_id in initial_ports { // Transfer messages associated with the transferred port let mut message_idx = 0; while message_idx < scheduled.ctx_fancy.inbox_messages.len() { @@ -311,7 +264,7 @@ impl Scheduler { ); self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); - self.runtime.send_message(port.peer_connector, reroute_message); + self.runtime.send_message(port.peer_connector, MessageFancy::Control(reroute_message)); } // Schedule new connector to run @@ -386,7 +339,7 @@ impl Scheduler { // ----------------------------------------------------------------------------- enum ComponentStateChange { - CreatedComponent(ConnectorPDL), + CreatedComponent(ConnectorPDL, Vec), CreatedPort(Port), ChangedPort(ComponentPortChange), } @@ -412,20 +365,14 @@ pub(crate) struct ComponentCtxFancy { is_in_sync: bool, changed_in_sync: bool, outbox: VecDeque, - state_changes: VecDeque + state_changes: VecDeque, // Workspaces that may be used by components to (generally) prevent // allocations. Be a good scout and leave it empty after you've used it. + // TODO: Move to scheduler ctx, this is the wrong place pub workspace_ports: Vec, pub workspace_branches: Vec, } -pub(crate) enum ReceivedMessage { - Data((PortIdLocal, DataMessage)), - Sync(SyncMessage), - RequestCommit(SolutionMessage), - ConfirmCommit(SolutionMessage), -} - impl ComponentCtxFancy { pub(crate) fn new_empty() -> Self { return Self{ @@ -437,14 +384,16 @@ impl ComponentCtxFancy { changed_in_sync: false, outbox: VecDeque::new(), state_changes: VecDeque::new(), + workspace_ports: Vec::new(), + workspace_branches: Vec::new(), }; } /// Notify the runtime that the component has created a new component. May /// only be called outside of a sync block. - pub(crate) fn push_component(&mut self, component: ConnectorPDL) { + pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec) { debug_assert!(!self.is_in_sync); - self.state_changes.push_back(ComponentStateChange::CreatedComponent(component)); + self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports)); } /// Notify the runtime that the component has created a new port. May only @@ -520,19 +469,21 @@ impl ComponentCtxFancy { if !self.is_in_sync { return None; } if self.inbox_len_read == self.inbox_messages.len() { return None; } + // We want to keep data messages in the inbox, because we need to check + // them in the future. We don't want to keep sync messages around, we + // should only handle them once. Control messages should never be in + // here. let message = &self.inbox_messages[self.inbox_len_read]; - if let MessageContents::Data(contents) = &message.contents { - self.inbox_len_read += 1; - return Some(ReceivedMessage::Data((message.receiving_port, contents.clone()))); - } else { - // Must be a sync/solution message - let message = self.inbox_messages.remove(self.inbox_len_read); - return match message.contents { - MessageContents::Sync(v) => Some(ReceivedMessage::Sync(v)), - MessageContents::RequestCommit(v) => Some(ReceivedMessage::RequestCommit(v)), - MessageContents::ConfirmCommit(v) => Some(ReceivedMessage::ConfirmCommit(v)), - _ => unreachable!(), // because we only put data/synclike messages in the inbox - } + match message { + MessageFancy::Data(content) => { + self.inbox_len_read += 1; + return Some(MessageFancy::Data(content.clone())); + }, + MessageFancy::Sync(_) => { + let message = self.inbox_messages.remove(self.inbox_len_read); + return Some(message); + }, + MessageFancy::Control(_) => unreachable!("control message ended up in component inbox"), } } } @@ -616,7 +567,7 @@ impl ControlMessageHandler { pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId - ) -> Message { + ) -> ControlMessageFancy { let id = self.take_id(); self.active.push(ControlEntry{ @@ -627,13 +578,10 @@ impl ControlMessageHandler { }), }); - return Message{ - sending_connector: self_connector_id, - receiving_port: peer_port_id, - contents: MessageContents::Control(ControlMessage{ - id, - content: ControlMessageVariant::CloseChannel(peer_port_id), - }), + return ControlMessageFancy{ + id, + sending_component_id: self_connector_id, + content: ControlContent::CloseChannel(peer_port_id), }; } @@ -645,7 +593,7 @@ impl ControlMessageHandler { port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId - ) -> Message { + ) -> ControlMessageFancy { let id = self.take_id(); self.active.push(ControlEntry{ @@ -657,13 +605,10 @@ impl ControlMessageHandler { }), }); - return Message{ - sending_connector: self_connector_id, - receiving_port: peer_port_id, - contents: MessageContents::Control(ControlMessage{ - id, - content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id), - }) + return ControlMessageFancy{ + id, + sending_component_id: self_connector_id, + content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id), }; }