diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index adf1a5be8bd2bca197139e5483975ede611e42e7..2a7873e8371a94bc7fc194b93f75d5cdeee02729 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,14 +1,13 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::runtime2::inbox2::ControlContent; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::branch::{BranchId}; -use super::connector2::{ConnectorPDL, ConnectorScheduling}; -use super::inbox2::{MessageFancy, DataMessageFancy, ControlMessageFancy}; +use super::connector::{ConnectorPDL, ConnectorScheduling}; +use super::inbox::{Message, DataMessage, ControlMessage, ControlContent}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -106,7 +105,7 @@ impl Scheduler { connector_id ); self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message)); - self.runtime.send_message(port.peer_connector, MessageFancy::Control(message)); + self.runtime.send_message(port.peer_connector, Message::Control(message)); } } @@ -142,7 +141,7 @@ impl Scheduler { self.debug_conn(connector_id, " ... Handling the message"); match message { - MessageFancy::Control(message) => { + Message::Control(message) => { match message.content { ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { // Need to change port target @@ -156,7 +155,7 @@ impl Scheduler { debug_assert!(scheduled.ctx_fancy.outbox.is_empty()); // And respond with an Ack - let ack_message = MessageFancy::Control(ControlMessageFancy{ + let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, @@ -170,7 +169,7 @@ impl Scheduler { port.state = PortState::Closed; // Send an Ack - let ack_message = MessageFancy::Control(ControlMessageFancy{ + let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, @@ -204,7 +203,7 @@ impl Scheduler { self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); let target_component_id = match &message { - MessageFancy::Data(content) => { + Message::Data(content) => { // Data messages are always sent to a particular port, and // may end up being rerouted. let port_desc = scheduled.ctx_fancy.get_port_by_id(content.data_header.sending_port).unwrap(); @@ -216,14 +215,14 @@ impl Scheduler { port_desc.peer_connector }, - MessageFancy::Sync(content) => { + Message::Sync(content) => { // Sync messages are always sent to a particular component, // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). content.target_component_id }, - MessageFancy::Control(_) => { + Message::Control(_) => { unreachable!("component sending control messages directly"); } }; @@ -269,7 +268,7 @@ impl Scheduler { ); self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); - self.runtime.send_message(port.peer_connector, MessageFancy::Control(reroute_message)); + self.runtime.send_message(port.peer_connector, Message::Control(reroute_message)); } // Schedule new connector to run @@ -330,11 +329,11 @@ impl Scheduler { } #[inline] - fn get_message_target_port(message: &MessageFancy) -> Option { + fn get_message_target_port(message: &Message) -> Option { match message { - MessageFancy::Data(data) => return Some(data.data_header.target_port), - MessageFancy::Sync(_) => {}, - MessageFancy::Control(control) => { + Message::Data(data) => return Some(data.data_header.target_port), + Message::Sync(_) => {}, + Message::Control(control) => { match &control.content { ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), ControlContent::CloseChannel(port_id) => return Some(*port_id), @@ -377,16 +376,16 @@ pub(crate) struct ComponentPortChange { /// of it by the component itself. When the component starts a sync block or /// exits a sync block the partially managed state by both component and /// scheduler need to be exchanged. -pub(crate) struct ComponentCtxFancy { +pub(crate) struct ComponentCtx { // Mostly managed by the scheduler pub(crate) id: ConnectorId, ports: Vec, - inbox_messages: Vec, // never control or ping messages + inbox_messages: Vec, // never control or ping messages inbox_len_read: usize, // Submitted by the component is_in_sync: bool, changed_in_sync: bool, - outbox: VecDeque, + outbox: VecDeque, state_changes: VecDeque, // Workspaces that may be used by components to (generally) prevent // allocations. Be a good scout and leave it empty after you've used it. @@ -395,7 +394,7 @@ pub(crate) struct ComponentCtxFancy { pub workspace_branches: Vec, } -impl ComponentCtxFancy { +impl ComponentCtx { pub(crate) fn new_empty() -> Self { return Self{ id: ConnectorId::new_invalid(), @@ -457,7 +456,7 @@ impl ComponentCtxFancy { /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. - pub(crate) fn submit_message(&mut self, contents: MessageFancy) { + pub(crate) fn submit_message(&mut self, contents: Message) { debug_assert!(self.is_in_sync); self.outbox.push_back(contents); } @@ -491,7 +490,7 @@ impl ComponentCtxFancy { /// Retrieves the next unread message from the inbox `None` if there are no /// (new) messages to read. // TODO: Fix the clone of the data message, entirely unnecessary - pub(crate) fn read_next_message(&mut self) -> Option { + pub(crate) fn read_next_message(&mut self) -> Option { if !self.is_in_sync { return None; } if self.inbox_len_read == self.inbox_messages.len() { return None; } @@ -501,34 +500,34 @@ impl ComponentCtxFancy { // here. let message = &self.inbox_messages[self.inbox_len_read]; match message { - MessageFancy::Data(content) => { + Message::Data(content) => { self.inbox_len_read += 1; - return Some(MessageFancy::Data(content.clone())); + return Some(Message::Data(content.clone())); }, - MessageFancy::Sync(_) => { + Message::Sync(_) => { let message = self.inbox_messages.remove(self.inbox_len_read); return Some(message); }, - MessageFancy::Control(_) => unreachable!("control message ended up in component inbox"), + Message::Control(_) => unreachable!("control message ended up in component inbox"), } } } pub(crate) struct MessagesIter<'a> { - messages: &'a [MessageFancy], + messages: &'a [Message], next_index: usize, max_index: usize, match_port_id: PortIdLocal, } impl<'a> Iterator for MessagesIter<'a> { - type Item = &'a DataMessageFancy; + type Item = &'a DataMessage; fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let message = &self.messages[self.next_index]; - if let MessageFancy::Data(message) = &message { + if let Message::Data(message) = &message { if message.data_header.target_port == self.match_port_id { // Found a match self.next_index += 1; @@ -593,7 +592,7 @@ impl ControlMessageHandler { pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId - ) -> ControlMessageFancy { + ) -> ControlMessage { let id = self.take_id(); self.active.push(ControlEntry{ @@ -604,7 +603,7 @@ impl ControlMessageHandler { }), }); - return ControlMessageFancy{ + return ControlMessage { id, sending_component_id: self_connector_id, content: ControlContent::CloseChannel(peer_port_id), @@ -619,7 +618,7 @@ impl ControlMessageHandler { port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId - ) -> ControlMessageFancy { + ) -> ControlMessage { let id = self.take_id(); self.active.push(ControlEntry{ @@ -631,7 +630,7 @@ impl ControlMessageHandler { }), }); - return ControlMessageFancy{ + return ControlMessage { id, sending_component_id: self_connector_id, content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id),