diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e1f59e016d3faae5850be6fe58a89953f32971a2..efbc77c9238a171452b8f6c5708f021ae5350bff 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,19 +1,95 @@ use std::sync::Arc; use std::sync::Condvar; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use std::thread; use crate::ProtocolDescription; use crate::runtime2::global_store::ConnectorVariant; +use crate::runtime2::inbox::MessageContents; use crate::runtime2::native::Connector; +use crate::runtime2::port::{Channel, PortKind, PortOwnership}; use super::RuntimeInner; -use super::port::{PortIdLocal}; +use super::port::{Port, PortIdLocal}; use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant}; use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; use super::global_store::{ConnectorKey, ConnectorId, GlobalStore}; +/// Contains fields that are mostly managed by the scheduler, but may be +/// accessed by the connector +pub(crate) struct ConnectorCtx { + pub(crate) id: ConnectorId, + port_counter: Arc, + pub(crate) ports: Vec, +} + +impl ConnectorCtx { + pub(crate) fn new(port_counter: Arc) -> ConnectorCtx { + Self{ + id: ConnectorId::new_invalid(), + port_counter, + ports: Vec::new(), + } + } + + /// Creates a (putter, getter) port pair belonging to the same channel. The + /// port will be implicitly owned by the connector. + pub(crate) fn create_channel(&mut self) -> Channel { + let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); + let putter_id = PortIdLocal::new(getter_id + 1); + let getter_id = PortIdLocal::new(getter_id); + + self.ports.push(Port{ + self_id: getter_id, + peer_id: putter_id, + kind: PortKind::Getter, + owning_connector: self.id, + peer_connector: self.id, + }); + + self.ports.push(Port{ + self_id: putter_id, + peer_id: getter_id, + kind: PortKind::Putter, + owning_connector: self.id, + peer_connector: self.id, + }); + + return Channel{ getter_id, putter_id }; + } + + pub(crate) fn add_port(&mut self, port: Port) { + debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id)); + self.ports.push(port); + } + + pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port { + let index = self.port_id_to_index(id); + return self.ports.remove(index); + } + + pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port { + let index = self.port_id_to_index(id); + return &self.ports[index]; + } + + pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port { + let index = self.port_id_to_index(id); + return &mut self.ports[index]; + } + + fn port_id_to_index(&self, id: PortIdLocal) -> usize { + for (idx, port) in self.ports.iter().enumerate() { + if port.self_id == id { + return idx; + } + } + + panic!("port {:?}, not owned by connector", id); + } +} + pub(crate) struct Scheduler { runtime: Arc, } @@ -57,76 +133,64 @@ impl Scheduler { while cur_schedule == ConnectorScheduling::Immediate { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { - // TODO: Put header in front of messages, this is a mess - match message { - Message::Data(message) => { + match message.contents { + MessageContents::Data(content) => { // Check if we need to reroute, or can just put it // in the private inbox of the connector - if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.sending_port) { - self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message)); + if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, content.sending_port) { + self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(content)); } else { - scheduled.connector.insert_data_message(message); + scheduled.connector.insert_data_message(content); } + } + MessageContents::Sync(content) => { + scheduled.connector.insert_sync_message(content, &scheduled.context, &mut delta_state); + } + MessageContents::Solution(content) => { + // TODO: Handle solution message }, - Message::Sync(message) => { - // TODO: Come back here after rewriting port ownership stuff - if let Some(other_connector_id) = scheduled.router.should_reroute() - }, - Message::Solution(solution) => { - - }, - Message::Control(message) => { - match message.content { + MessageContents::Control(content) => { + match content.content { ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { // Need to change port target - let port = self.runtime.global_store.ports.get(&connector_key, port_id); + let port = scheduled.context.get_port_mut(port_id); port.peer_connector = new_target_connector_id; debug_assert!(delta_state.outbox.is_empty()); // And respond with an Ack + // Note: after this code has been reached, we may not have any + // messages in the outbox that send to the port whose owning + // connector we just changed. This is because the `ack` will + // clear the rerouting entry of the `ack`-receiver. self.send_message_and_wake_up_if_sleeping( - message.sender, - Message::Control(ControlMessage{ - id: message.id, - sender: connector_key.downcast(), - content: ControlMessageVariant::Ack, - }) + content.sender, + Message{ + sending_connector: connector_key.downcast(), + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), + } ); }, ControlMessageVariant::Ack => { - scheduled.router.handle_ack(message.id); + scheduled.router.handle_ack(content.id); } } - }, + } Message::Ping => {}, } } // Actually run the connector - // TODO: Revise - let new_schedule; - match &mut scheduled.connector { - ConnectorVariant::UserDefined(connector) => { - if connector.is_in_sync_mode() { - // In synchronous mode, so we can expect messages being sent, - // but we never expect the creation of connectors - new_schedule = connector.run_in_speculative_mode(&self.runtime.protocol_description, &mut delta_state); - debug_assert!(delta_state.new_connectors.is_empty()); - } else { - // In regular running mode (not in a sync block) we cannot send - // messages but we can create new connectors - new_schedule = connector.run_in_deterministic_mode(&self.runtime.protocol_description, &mut delta_state); - debug_assert!(delta_state.outbox.is_empty()); - } - }, - ConnectorVariant::Native(connector) => { - new_schedule = connector.run(&self.runtime.protocol_description); - }, - } + let new_schedule = scheduled.connector.run( + &self.runtime.protocol_description, &scheduled.context, &mut delta_state + ); // Handle all of the output from the current run: messages to // send and connectors to instantiate. - self.handle_delta_state(&connector_key, &mut delta_state); + self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state); cur_schedule = new_schedule; } @@ -164,26 +228,54 @@ impl Scheduler { } } - fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) { + fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) { // Handling any messages that were sent + let connector_id = connector_key.downcast(); + if !delta_state.outbox.is_empty() { - for message in delta_state.outbox.drain(..) { - let (inbox_message, target_connector_id) = { - let sending_port = self.runtime.global_store.ports.get(&connector_key, message.sending_port); - ( - DataMessage { - sending_connector: connector_key.downcast(), - sending_port: sending_port.self_id, - receiving_port: sending_port.peer_id, - sender_prev_branch_id: message.sender_prev_branch_id, - sender_cur_branch_id: message.sender_cur_branch_id, - message: message.message, - }, - sending_port.peer_connector, - ) + for mut message in delta_state.outbox.drain(..) { + // Based on the message contents, decide where the message + // should be sent to. This might end up modifying the message. + let (peer_connector, peer_port) = match &mut message { + MessageContents::Data(contents) => { + let port = context.get_port(contents.sending_port); + (port.peer_connector, port.peer_id) + }, + MessageContents::Sync(contents) => { + let connector = contents.to_visit.pop().unwrap(); + (connector, PortIdLocal::new_invalid()) + }, + MessageContents::RequestCommit(contents)=> { + let connector = contents.to_visit.pop().unwrap(); + (connector, PortIdLocal::new_invalid()) + }, + MessageContents::ConfirmCommit(contents) => { + for to_visit in &contents.to_visit { + let message = Message{ + sending_connector: connector_id, + receiving_port: PortIdLocal::new_invalid(), + contents: contents.clone(), + }; + self.send_message_and_wake_up_if_sleeping(*to_visit, message); + } + (ConnectorId::new_invalid(), PortIdLocal::new_invalid()) + }, + MessageContents::Control(_) | MessageContents::Ping => { + // Never generated by the user's code + unreachable!(); + } }; - self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message)); + // TODO: Maybe clean this up, perhaps special case for + // ConfirmCommit can be handled differently. + if peer_connector.is_valid() { + let message = Message { + sending_connector: connector_id, + receiving_port: peer_port, + contents: message, + }; + self.send_message_and_wake_up_if_sleeping(peer_connector, message); + } } } @@ -194,25 +286,16 @@ impl Scheduler { for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key - let new_key = self.runtime.global_store.connectors.create(ConnectorVariant::UserDefined(new_connector)); + let new_key = self.runtime.global_store.connectors.create(cur_connector, ConnectorVariant::UserDefined(new_connector)); let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); - // Each port should be lost by the connector that created the - // new one. Note that the creator is the current owner. - for port_id in &new_connector.ports.owned_ports { - debug_assert!(!cur_connector.ports.owned_ports.contains(port_id)); - - // Modify ownership, retrieve peer connector - let (peer_connector_id, peer_port_id) = { - let mut port = self.runtime.global_store.ports.get(connector_key, *port_id); - port.owning_connector = new_key.downcast(); - - (port.peer_connector, port.peer_id) - }; - - // Send message that port has changed ownership + // Call above changed ownership of ports, but we still have to + // let the other end of the channel know that the port has + // changed location. + for port in &new_connector.context.ports { let reroute_message = cur_connector.router.prepare_reroute( - port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast() + port.self_id, port.peer_id, cur_connector.context.id, + port.peer_connector, new_connector.context.id ); self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message); @@ -282,7 +365,6 @@ impl Router { return Message::Control(ControlMessage{ id, - sender: self_connector_id, content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id) }); }