diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index 82eea098b06518f068ab584ac20f1988260ce872..37ecca56cc66c87b0c6ec74d7295cc0d4bb1a350 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -96,7 +96,7 @@ impl Connector for ConnectorPDL { } impl ConnectorPDL { - pub fn new(initial: ComponentState, owned_ports: Vec) -> Self { + pub fn new(initial: ComponentState) -> Self { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), @@ -129,7 +129,7 @@ impl ConnectorPDL { let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.clone()); - self.consensus.notify_of_received_message(branch_id, &message.data_header); + self.consensus.notify_of_received_message(branch_id, &message.data_header, &message.content); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -303,8 +303,9 @@ impl ConnectorPDL { definition_id, monomorph_idx, arguments ), }; - let new_component = ConnectorPDL::new(new_state, comp_ctx.workspace_ports.clone()); - comp_ctx.push_component(new_component); + let new_component = ConnectorPDL::new(new_state); + comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); + comp_ctx.workspace_ports.clear(); return ConnectorScheduling::Later; }, diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index d767ac3b32f5ba86419cf6db6ed055335bd16ca1..110d934e910921dd947b8b2fd8a6e042dc285ace 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -165,6 +165,7 @@ impl Consensus { let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), + sending_port: port_info.peer_id, target_port: port_info.peer_id, new_mapping: branch_id }; diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 6c7f03c300f50335fb7e8b048ba18604d8cfb2bd..e98a76f7fc9be937aa92ed6b03eb8907fdc9bc37 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -17,6 +17,7 @@ use std::sync::Mutex; use super::ConnectorId; use crate::protocol::eval::ValueGroup; +use crate::runtime2::inbox2::MessageFancy; use super::connector::BranchId; use super::port::PortIdLocal; @@ -215,7 +216,7 @@ pub struct Message { // TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. // Should behave as a MPSC queue. pub struct PublicInbox { - messages: Mutex>, + messages: Mutex>, } impl PublicInbox { @@ -225,12 +226,12 @@ impl PublicInbox { } } - pub fn insert_message(&self, message: Message) { + pub fn insert_message(&self, message: MessageFancy) { let mut lock = self.messages.lock().unwrap(); lock.push_back(message); } - pub fn take_message(&self) -> Option { + pub fn take_message(&self) -> Option { let mut lock = self.messages.lock().unwrap(); return lock.pop_front(); } diff --git a/src/runtime2/inbox2.rs b/src/runtime2/inbox2.rs index d07e95d6839724f9c0b4ae5066a4c42688dc893b..ff2d65a34b542ca74445a72bbbb3499091ce8011 100644 --- a/src/runtime2/inbox2.rs +++ b/src/runtime2/inbox2.rs @@ -3,7 +3,9 @@ use crate::runtime2::branch::BranchId; use crate::runtime2::ConnectorId; use crate::runtime2::port::PortIdLocal; -#[derive(Copy, Clone)] +// TODO: Remove Debug derive from all types + +#[derive(Debug, Copy, Clone)] pub(crate) struct PortAnnotation { pub port_id: PortIdLocal, pub registered_id: Option, @@ -11,32 +13,38 @@ pub(crate) struct PortAnnotation { } /// The header added by the synchronization algorithm to all. +#[derive(Debug, Clone)] pub(crate) struct SyncHeader { pub sending_component_id: ConnectorId, pub highest_component_id: ConnectorId, } /// The header added to data messages +#[derive(Debug, Clone)] pub(crate) struct DataHeader { pub expected_mapping: Vec, + pub sending_port: PortIdLocal, pub target_port: PortIdLocal, pub new_mapping: BranchId, } /// A data message is a message that is intended for the receiver's PDL code, /// but will also be handled by the consensus algrorithm +#[derive(Debug, Clone)] pub(crate) struct DataMessageFancy { pub sync_header: SyncHeader, pub data_header: DataHeader, pub content: ValueGroup, } +#[derive(Debug)] pub(crate) enum SyncContent { } /// A sync message is a message that is intended only for the consensus /// algorithm. +#[derive(Debug)] pub(crate) struct SyncMessageFancy { pub sync_header: SyncHeader, pub content: SyncContent, @@ -44,11 +52,14 @@ pub(crate) struct SyncMessageFancy { /// A control message is a message intended for the scheduler that is executing /// a component. +#[derive(Debug)] pub(crate) struct ControlMessageFancy { pub id: u32, // generic identifier, used to match request to response + pub sending_component_id: ConnectorId, pub content: ControlContent, } +#[derive(Debug)] pub(crate) enum ControlContent { PortPeerChanged(PortIdLocal, ConnectorId), CloseChannel(PortIdLocal), @@ -57,6 +68,7 @@ pub(crate) enum ControlContent { } /// Combination of data message and control messages. +#[derive(Debug)] pub(crate) enum MessageFancy { Data(DataMessageFancy), Sync(SyncMessageFancy), diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 1fb96a52dc243346639b4e20266a1edb5f76d996..04b3ae6015e1ecb90d14256a31ec060c1847d2bc 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -25,9 +25,10 @@ use crate::collections::RawVec; use crate::ProtocolDescription; use inbox::Message; -use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; +use connector2::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; use scheduler::{Scheduler, ControlMessageHandler}; use native::{Connector, ConnectorApplication, ApplicationInterface}; +use crate::runtime2::inbox2::MessageFancy; use crate::runtime2::port::{Port, PortState}; use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx}; @@ -244,7 +245,7 @@ impl RuntimeInner { /// Sends a message to a particular connector. If the connector happened to /// be sleeping then it will be scheduled for execution. - pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) { + pub(crate) fn send_message(&self, target_id: ConnectorId, message: MessageFancy) { let target = self.get_component_public(target_id); target.inbox.insert_message(message); diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 19371b8bea7327ad1854c21af335f37ebd077341..50329ff5cfc43f4f43ca9f68633c9bdcfd98c638 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,16 +1,14 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy}; +use crate::runtime2::inbox2::ControlContent; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; -use super::connector::{BranchId, ConnectorPDL, ConnectorScheduling}; -use super::inbox::{ - Message, MessageContents, ControlMessageVariant, - DataMessage, ControlMessage, SolutionMessage, SyncMessage -}; +use super::branch::{BranchId}; +use super::connector2::{ConnectorPDL, ConnectorScheduling}; +use super::inbox2::{MessageFancy, DataMessageFancy, SyncMessageFancy, ControlMessageFancy}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -108,7 +106,7 @@ impl Scheduler { connector_id ); self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message)); - self.runtime.send_message(port.peer_connector, message); + self.runtime.send_message(port.peer_connector, MessageFancy::Control(message)); } } @@ -140,10 +138,10 @@ impl Scheduler { // Handle special messages here, messages for the component // will be added to the inbox. self.debug_conn(connector_id, " ... Handling message myself"); - match message.contents { - MessageContents::Control(content) => { - match content.content { - ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { + match message { + MessageFancy::Control(message) => { + match message.content { + ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { // Need to change port target let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); port.peer_connector = new_target_connector_id; @@ -155,43 +153,34 @@ impl Scheduler { debug_assert!(scheduled.ctx_fancy.outbox.is_empty()); // And respond with an Ack - let ack_message = Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - }; + let ack_message = MessageFancy::Control(ControlMessageFancy{ + id: content.id, + sending_component_id: connector_id, + content: ControlContent::Ack, + }); self.debug_conn(connector_id, &format!("Sending message [pp ack]\n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_connector, ack_message); + self.runtime.send_message(message.sending_component_id, ack_message); }, - ControlMessageVariant::CloseChannel(port_id) => { + ControlContent::CloseChannel(port_id) => { // Mark the port as being closed let port = scheduled.ctx_fancy.get_port_mut_by_id(port_id).unwrap(); port.state = PortState::Closed; // Send an Ack - let ack_message = Message{ - sending_connector: connector_id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - }; + let ack_message = MessageFancy::Control(ControlMessageFancy{ + id: content.id, + sending_component_id: connector_id, + content: ControlContent::Ack, + }); self.debug_conn(connector_id, &format!("Sending message [cc ack] \n --- {:?}", ack_message)); - self.runtime.send_message(message.sending_connector, ack_message); + self.runtime.send_message(message.sending_component_id, ack_message); }, - ControlMessageVariant::Ack => { + ControlContent::Ack => { scheduled.router.handle_ack(content.id); - } + }, + ControlContent::Ping => {}, } }, - MessageContents::Ping => { - // Pings are sent just to wake up a component, so - // nothing to do here. - }, _ => { // All other cases have to be handled by the component scheduled.ctx_fancy.inbox_messages.push(message); @@ -203,87 +192,51 @@ impl Scheduler { /// Handles changes to the context that were made by the component. This is /// the way (due to Rust's borrowing rules) that we bubble up changes in the /// component's state that the scheduler needs to know about (e.g. a message - /// that the component wants to send). + /// that the component wants to send, a port that has been added). fn handle_changes_in_context(&mut self, scheduled: &mut ScheduledConnector) { let connector_id = scheduled.ctx_fancy.id; // Handling any messages that were sent while let Some(mut message) = scheduled.ctx_fancy.outbox.pop_front() { - // Based on the message contents, decide where the message - // should be sent to. This might end up modifying the message. self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); - let (peer_connector, self_port, peer_port) = match &mut message { - MessageContents::Data(contents) => { - let port = scheduled.ctx_fancy.get_port_by_id(contents.sending_port).unwrap(); - (port.peer_connector, contents.sending_port, port.peer_id) - }, - MessageContents::Sync(contents) => { - let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::RequestCommit(contents)=> { - let connector = contents.to_visit.pop().unwrap(); - (connector, PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) - }, - MessageContents::ConfirmCommit(contents) => { - for to_visit in &contents.to_visit { - let message = Message{ - sending_connector: scheduled.ctx_fancy.id, - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::ConfirmCommit(contents.clone()), - }; - self.runtime.send_message(*to_visit, message); + + let target_component_id = match &message { + MessageFancy::Data(content) => { + // Data messages are always sent to a particular port, and + // may end up being rerouted. + let port_desc = scheduled.ctx_fancy.get_port_by_id(content.data_header.sending_port).unwrap(); + debug_assert_eq!(port_desc.peer_id, content.data_header.target_port); + + if port_desc.state == PortState::Closed { + todo!("handle sending over a closed port") } - (ConnectorId::new_invalid(), PortIdLocal::new_invalid(), PortIdLocal::new_invalid()) + + port_desc.peer_connector }, - MessageContents::Control(_) | MessageContents::Ping => { - // Never generated by the user's code - unreachable!(); + MessageFancy::Sync(content) => { + // Sync messages are always sent to a particular component, + // the sender must make sure it actually wants to send to + // the specified component (and is not using an inconsistent + // component ID associated with a port). + content.sync_header.highest_component_id + }, + MessageFancy::Control(_) => { + unreachable!("component sending control messages directly"); } }; - // TODO: Maybe clean this up, perhaps special case for - // ConfirmCommit can be handled differently. - if peer_connector.is_valid() { - if peer_port.is_valid() { - // Sending a message to a port, so the port may not be - // closed. - let port = scheduled.ctx_fancy.get_port_by_id(self_port).unwrap(); - match port.state { - PortState::Open => {}, - PortState::Closed => { - todo!("Handling sending over a closed port"); - } - } - } - let message = Message { - sending_connector: scheduled.ctx_fancy.id, - receiving_port: peer_port, - contents: message, - }; - self.runtime.send_message(peer_connector, message); - } + self.runtime.send_message(target_component_id, message); } while let Some(state_change) = scheduled.ctx_fancy.state_changes.pop_front() { match state_change { - ComponentStateChange::CreatedComponent(component) => { + ComponentStateChange::CreatedComponent(component, initial_ports) => { // Add the new connector to the global registry let new_key = self.runtime.create_pdl_component(component, false); let new_connector = self.runtime.get_component_private(&new_key); // Transfer ports - // TODO: Clean this up the moment native components are somewhat - // properly implemented. We need to know about the ports that - // are "owned by the PDL code", and then make sure that the - // context contains a description of those ports. - let ports = if let ConnectorVariant::UserDefined(connector) = &new_connector.connector { - &connector.ports.owned_ports - } else { - unreachable!(); - }; - - for port_id in ports { + for port_id in initial_ports { // Transfer messages associated with the transferred port let mut message_idx = 0; while message_idx < scheduled.ctx_fancy.inbox_messages.len() { @@ -311,7 +264,7 @@ impl Scheduler { ); self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); - self.runtime.send_message(port.peer_connector, reroute_message); + self.runtime.send_message(port.peer_connector, MessageFancy::Control(reroute_message)); } // Schedule new connector to run @@ -386,7 +339,7 @@ impl Scheduler { // ----------------------------------------------------------------------------- enum ComponentStateChange { - CreatedComponent(ConnectorPDL), + CreatedComponent(ConnectorPDL, Vec), CreatedPort(Port), ChangedPort(ComponentPortChange), } @@ -412,20 +365,14 @@ pub(crate) struct ComponentCtxFancy { is_in_sync: bool, changed_in_sync: bool, outbox: VecDeque, - state_changes: VecDeque + state_changes: VecDeque, // Workspaces that may be used by components to (generally) prevent // allocations. Be a good scout and leave it empty after you've used it. + // TODO: Move to scheduler ctx, this is the wrong place pub workspace_ports: Vec, pub workspace_branches: Vec, } -pub(crate) enum ReceivedMessage { - Data((PortIdLocal, DataMessage)), - Sync(SyncMessage), - RequestCommit(SolutionMessage), - ConfirmCommit(SolutionMessage), -} - impl ComponentCtxFancy { pub(crate) fn new_empty() -> Self { return Self{ @@ -437,14 +384,16 @@ impl ComponentCtxFancy { changed_in_sync: false, outbox: VecDeque::new(), state_changes: VecDeque::new(), + workspace_ports: Vec::new(), + workspace_branches: Vec::new(), }; } /// Notify the runtime that the component has created a new component. May /// only be called outside of a sync block. - pub(crate) fn push_component(&mut self, component: ConnectorPDL) { + pub(crate) fn push_component(&mut self, component: ConnectorPDL, initial_ports: Vec) { debug_assert!(!self.is_in_sync); - self.state_changes.push_back(ComponentStateChange::CreatedComponent(component)); + self.state_changes.push_back(ComponentStateChange::CreatedComponent(component, initial_ports)); } /// Notify the runtime that the component has created a new port. May only @@ -520,19 +469,21 @@ impl ComponentCtxFancy { if !self.is_in_sync { return None; } if self.inbox_len_read == self.inbox_messages.len() { return None; } + // We want to keep data messages in the inbox, because we need to check + // them in the future. We don't want to keep sync messages around, we + // should only handle them once. Control messages should never be in + // here. let message = &self.inbox_messages[self.inbox_len_read]; - if let MessageContents::Data(contents) = &message.contents { - self.inbox_len_read += 1; - return Some(ReceivedMessage::Data((message.receiving_port, contents.clone()))); - } else { - // Must be a sync/solution message - let message = self.inbox_messages.remove(self.inbox_len_read); - return match message.contents { - MessageContents::Sync(v) => Some(ReceivedMessage::Sync(v)), - MessageContents::RequestCommit(v) => Some(ReceivedMessage::RequestCommit(v)), - MessageContents::ConfirmCommit(v) => Some(ReceivedMessage::ConfirmCommit(v)), - _ => unreachable!(), // because we only put data/synclike messages in the inbox - } + match message { + MessageFancy::Data(content) => { + self.inbox_len_read += 1; + return Some(MessageFancy::Data(content.clone())); + }, + MessageFancy::Sync(_) => { + let message = self.inbox_messages.remove(self.inbox_len_read); + return Some(message); + }, + MessageFancy::Control(_) => unreachable!("control message ended up in component inbox"), } } } @@ -616,7 +567,7 @@ impl ControlMessageHandler { pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId - ) -> Message { + ) -> ControlMessageFancy { let id = self.take_id(); self.active.push(ControlEntry{ @@ -627,13 +578,10 @@ impl ControlMessageHandler { }), }); - return Message{ - sending_connector: self_connector_id, - receiving_port: peer_port_id, - contents: MessageContents::Control(ControlMessage{ - id, - content: ControlMessageVariant::CloseChannel(peer_port_id), - }), + return ControlMessageFancy{ + id, + sending_component_id: self_connector_id, + content: ControlContent::CloseChannel(peer_port_id), }; } @@ -645,7 +593,7 @@ impl ControlMessageHandler { port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId - ) -> Message { + ) -> ControlMessageFancy { let id = self.take_id(); self.active.push(ControlEntry{ @@ -657,13 +605,10 @@ impl ControlMessageHandler { }), }); - return Message{ - sending_connector: self_connector_id, - receiving_port: peer_port_id, - contents: MessageContents::Control(ControlMessage{ - id, - content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id), - }) + return ControlMessageFancy{ + id, + sending_component_id: self_connector_id, + content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id), }; }