diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 233e0c0c7c0b02205d8b021eff89ef8010d7d50c..8f2ac8ae030d9f193e325dd32274a788ef182fe0 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,20 +1,18 @@ use std::sync::Arc; -use std::sync::Condvar; use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use std::thread; -use crate::ProtocolDescription; use crate::runtime2::global_store::ConnectorVariant; use crate::runtime2::inbox::MessageContents; use crate::runtime2::native::Connector; -use crate::runtime2::port::{Channel, PortKind, PortOwnership}; +use crate::runtime2::port::{Channel, PortKind}; use super::RuntimeInner; use super::port::{Port, PortIdLocal}; -use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant}; -use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; -use super::global_store::{ConnectorKey, ConnectorId, GlobalStore}; +use super::inbox::{Message, ControlMessage, ControlMessageVariant}; +use super::connector::{ConnectorScheduling, RunDeltaState}; +use super::global_store::{ConnectorKey, ConnectorId}; /// Contains fields that are mostly managed by the scheduler, but may be /// accessed by the connector @@ -29,7 +27,7 @@ impl ConnectorCtx { Self{ id: ConnectorId::new_invalid(), port_counter, - ports: initial_ports, + ports: Vec::new(), } } @@ -131,53 +129,46 @@ impl Scheduler { while cur_schedule == ConnectorScheduling::Immediate { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { - match message.contents { - MessageContents::Data(content) => { - // Check if we need to reroute, or can just put it - // in the private inbox of the connector - if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, content.sending_port) { - self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(content)); - } else { - scheduled.connector.insert_data_message(content); - } - } - MessageContents::Sync(content) => { - scheduled.connector.insert_sync_message(content, &scheduled.context, &mut delta_state); - } - MessageContents::Solution(content) => { - // TODO: Handle solution message - }, - MessageContents::Control(content) => { - match content.content { - ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { - // Need to change port target - let port = scheduled.context.get_port_mut(port_id); - port.peer_connector = new_target_connector_id; - debug_assert!(delta_state.outbox.is_empty()); - - // And respond with an Ack - // Note: after this code has been reached, we may not have any - // messages in the outbox that send to the port whose owning - // connector we just changed. This is because the `ack` will - // clear the rerouting entry of the `ack`-receiver. - self.send_message_and_wake_up_if_sleeping( - content.sender, - Message{ - sending_connector: connector_key.downcast(), - receiving_port: PortIdLocal::new_invalid(), - contents: MessageContents::Control(ControlMessage{ - id: content.id, - content: ControlMessageVariant::Ack, - }), - } - ); - }, - ControlMessageVariant::Ack => { - scheduled.router.handle_ack(content.id); - } + // Check for rerouting + if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.receiving_port) { + self.send_message_and_wake_up_if_sleeping(other_connector_id, message); + continue; + } + + // Check for messages that requires special action from the + // scheduler. + if let MessageContents::Control(content) = message.contents { + match content.content { + ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { + // Need to change port target + let port = scheduled.context.get_port_mut(port_id); + port.peer_connector = new_target_connector_id; + debug_assert!(delta_state.outbox.is_empty()); + + // And respond with an Ack + // Note: after this code has been reached, we may not have any + // messages in the outbox that send to the port whose owning + // connector we just changed. This is because the `ack` will + // clear the rerouting entry of the `ack`-receiver. + self.send_message_and_wake_up_if_sleeping( + message.sending_connector, + Message{ + sending_connector: connector_key.downcast(), + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), + } + ); + }, + ControlMessageVariant::Ack => { + scheduled.router.handle_ack(content.id); } } - Message::Ping => {}, + } else { + // Let connector handle message + scheduled.connector.handle_message(message.contents, &scheduled.context, &mut delta_state); } } @@ -252,7 +243,7 @@ impl Scheduler { let message = Message{ sending_connector: connector_id, receiving_port: PortIdLocal::new_invalid(), - contents: contents.clone(), + contents: MessageContents::ConfirmCommit(contents.clone()), }; self.send_message_and_wake_up_if_sleeping(*to_visit, message); } @@ -277,6 +268,12 @@ impl Scheduler { } } + if !delta_state.new_ports.is_empty() { + for port in delta_state.new_ports.drain(..) { + context.ports.push(port); + } + } + // Handling any new connectors that were scheduled // TODO: Pool outgoing messages to reduce atomic access if !delta_state.new_connectors.is_empty() { @@ -284,7 +281,7 @@ impl Scheduler { for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key - let new_key = self.runtime.global_store.connectors.create(cur_connector, ConnectorVariant::UserDefined(new_connector)); + let new_key = self.runtime.global_store.connectors.create_pdl(cur_connector, new_connector); let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); // Call above changed ownership of ports, but we still have to @@ -296,7 +293,7 @@ impl Scheduler { port.peer_connector, new_connector.context.id ); - self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message); + self.send_message_and_wake_up_if_sleeping(port.peer_connector, reroute_message); } // Schedule new connector to run @@ -305,7 +302,7 @@ impl Scheduler { } } - pub fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { + fn send_message_and_wake_up_if_sleeping(&self, connector_id: ConnectorId, message: Message) { let connector = self.runtime.global_store.connectors.get_shared(connector_id); connector.inbox.insert_message(message); @@ -324,7 +321,7 @@ impl Scheduler { // TODO: Optimize struct ReroutedTraffic { id: u32, // ID of control message - port: PortIdLocal, // targeted port + target_port: PortIdLocal, // targeted port source_connector: ConnectorId, // connector we expect messages from target_connector: ConnectorId, // connector they should be rerouted to } @@ -356,23 +353,27 @@ impl Router { self.active.push(ReroutedTraffic{ id, - port: port_id, + target_port: port_id, source_connector: peer_connector_id, target_connector: new_owner_connector_id, }); - return Message::Control(ControlMessage{ - id, - content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id) - }); + return Message{ + sending_connector: self_connector_id, + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id, + content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id), + }) + }; } /// Returns true if the supplied message should be rerouted. If so then this /// function returns the connector that should retrieve this message. - pub fn should_reroute(&self, sending_connector: ConnectorId, sending_port: PortIdLocal) -> Option { + pub fn should_reroute(&self, sending_connector: ConnectorId, target_port: PortIdLocal) -> Option { for reroute in &self.active { if reroute.source_connector == sending_connector && - reroute.port == sending_port { + reroute.target_port == target_port { // Need to reroute this message return Some(reroute.target_connector); }