diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 0cd20a6adf9fa855d54512e54e2b32c76eb19717..22e82638e9e0b1d5468aae9428b89ec9c63248ec 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -5,9 +5,11 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::global_store::{ConnectorKey, GlobalStore}; -use crate::runtime2::inbox::{OutgoingMessage, SolutionMessage}; +use crate::runtime2::global_store::ConnectorKey; +use crate::runtime2::inbox::{MessageContents, OutgoingMessage, SolutionMessage}; +use crate::runtime2::native::Connector; use crate::runtime2::port::PortKind; +use crate::runtime2::scheduler::ConnectorCtx; use super::global_store::ConnectorId; use super::inbox::{ PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage, @@ -97,6 +99,15 @@ impl Branch { ports_delta: parent_branch.ports_delta.clone(), } } + + fn commit_to_sync(&mut self) { + self.index = BranchId::new(0); + self.parent_index = BranchId::new_invalid(); + self.sync_state = SpeculativeState::RunningNonSync; + self.next_branch_in_queue = None; + self.received.clear(); + self.ports_delta.clear(); + } } #[derive(Clone)] @@ -141,8 +152,7 @@ enum PortOwnershipError { AlreadyGivenAway(PortIdLocal) } -/// As the name implies, this contains a description of the ports associated -/// with a connector. +/// Contains a description of the port mapping during a particular sync session. /// TODO: Extend documentation pub(crate) struct ConnectorPorts { // Essentially a mapping from `port_index` to `port_id`. @@ -184,6 +194,24 @@ impl ConnectorPorts { } } + /// Adds a new port. Caller must make sure that the connector is not in the + /// sync phase. + fn add_port(&mut self, port_id: PortIdLocal) { + debug_assert!(self.port_mapping.len() == self.owned_ports.len()); + debug_assert!(!self.owned_ports.contains(&port_id)); + self.owned_ports.push(port_id); + self.port_mapping.push(PortAssignment::new_unassigned()); + } + + /// Commits to a particular branch. Essentially just removes the port + /// mapping information generated during the sync phase. + fn commit_to_sync(&mut self) { + self.port_mapping.truncate(self.owned_ports.len()); + debug_assert!(self.port_mapping.iter().all(|v| { + !v.is_assigned && !v.last_registered_branch_id.is_valid() + })); + } + /// Removes a particular port from the connector. May only be done if the /// connector is in non-sync mode fn remove_port(&mut self, port_id: PortIdLocal) { @@ -251,14 +279,22 @@ struct BranchQueue { } impl BranchQueue { + #[inline] fn new() -> Self { Self{ first: 0, last: 0 } } + #[inline] fn is_empty(&self) -> bool { debug_assert!((self.first == 0) == (self.last == 0)); return self.first == 0; } + + #[inline] + fn clear(&mut self) { + self.first = 0; + self.last = 0; + } } /// Public fields of the connector that can be freely shared between multiple @@ -281,7 +317,6 @@ impl ConnectorPublic { // TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct ConnectorPDL { // State and properties of connector itself - id: ConnectorId, in_sync: bool, // Branch management branches: Vec, // first branch is always non-speculative one @@ -289,7 +324,9 @@ pub(crate) struct ConnectorPDL { sync_pending_get: BranchQueue, sync_finished: BranchQueue, sync_finished_last_handled: u32, + cur_round: u32, // Port/message management + pub committed_to: Option<(ConnectorId, u64)>, pub inbox: PrivateInbox, pub ports: ConnectorPorts, } @@ -313,43 +350,112 @@ impl RunContext for TempCtx { } } +impl Connector for ConnectorPDL { + fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + use MessageContents as MC; + + match message { + MC::Data(message) => self.handle_data_message(message), + MC::Sync(message) => self.handle_sync_message(message, ctx, delta_state), + MC::RequestCommit(message) => self.handle_request_commit_message(message, ctx, delta_state), + MC::ConfirmCommit(message) => self.handle_confirm_commit_message(message, ctx, delta_state), + MC::Control(_) | MC::Ping => {}, + } + } + + fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + if self.in_sync { + let scheduling = self.run_in_speculative_mode(pd, ctx, results); + + // When in speculative mode we might have generated new sync + // solutions, we need to turn them into proposed solutions here. + if self.sync_finished_last_handled != self.sync_finished.last { + // Retrieve first element in queue + let mut next_id; + if self.sync_finished_last_handled == 0 { + next_id = self.sync_finished.first; + } else { + let last_handled = &self.branches[self.sync_finished_last_handled as usize]; + debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" + next_id = last_handled.next_branch_in_queue.unwrap(); + } + + loop { + let branch_id = BranchId::new(next_id); + let branch = &self.branches[next_id as usize]; + let branch_next = branch.next_branch_in_queue; + + // Turn local solution into a message and send it along + // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed + let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx); + if let Some(valid_solution) = solution_message { + self.submit_sync_solution(valid_solution, ctx, results); + } else { + // Branch is actually invalid, but we only just figured + // it out. We need to mark it as invalid to prevent + // future use + Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); + if branch_id == self.sync_finished_last_handled { + self.sync_finished_last_handled = self.sync_finished.last; + } + + let branch = &mut self.branches[next_id as usize]; + branch.sync_state = SpeculativeState::Inconsistent; + } + + match branch_next { + Some(id) => next_id = id, + None => break, + } + } + + self.sync_finished_last_handled = next_id; + } + + return scheduling; + } else { + let scheduling = self.run_in_deterministic_mode(pd, ctx, results); + return scheduling; + } + } +} + impl ConnectorPDL { /// Constructs a representation of a connector. The assumption is that the /// initial branch is at the first instruction of the connector's code, - /// hence is in a non-sync state. Note that the initial ID is invalid, we - /// assume the connector will get inserted into the runtime, there it will - /// receive its ID. + /// hence is in a non-sync state. pub fn new(initial_branch: Branch, owned_ports: Vec) -> Self { Self{ - id: ConnectorId::new_invalid(), in_sync: false, branches: vec![initial_branch], sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), sync_finished_last_handled: 0, // none at all + cur_round: 0, + committed_to: None, inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), } } - pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { - debug_assert!(!self.id.is_valid()); // ID should only be set once - self.id = id; - } - pub fn is_in_sync_mode(&self) -> bool { return self.in_sync; } - pub fn insert_data_message(&mut self, message: DataMessage) { + // ------------------------------------------------------------------------- + // Handling connector messages + // ------------------------------------------------------------------------- + + #[inline] + pub fn handle_data_message(&mut self, message: DataMessage) { self.inbox.insert_message(message); } /// Accepts a synchronous message and combines it with the locally stored - /// solution(s). - pub fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, results: &mut RunDeltaState) { - debug_assert!(!message.to_visit.contains(&self.id)); // own ID already removed + /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate. + pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { + debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints // TODO: Optimize, use some kind of temp workspace vector @@ -359,7 +465,7 @@ impl ConnectorPDL { // We have some solutions to match against let constraints_index = message.constraints .iter() - .position(|v| v.connector_id == self.id) + .position(|v| v.connector_id == ctx.id) .unwrap(); let constraints = &message.constraints[constraints_index].constraints; debug_assert!(!constraints.is_empty()); @@ -437,7 +543,7 @@ impl ConnectorPDL { // - replace constraints with a local solution new_solution.constraints.remove(constraints_index); new_solution.local_solutions.push(SyncConnectorSolution{ - connector_id: self.id, + connector_id: ctx.id, terminating_branch_id: BranchId::new(branch_index), execution_branch_ids: execution_path_branch_ids.clone(), final_port_mapping: new_solution_mapping, @@ -449,8 +555,7 @@ impl ConnectorPDL { let port_id = self.ports.get_port_id(port_index); let (peer_connector_id, peer_port_id, peer_is_getter) = { - let key = unsafe{ ConnectorKey::from_id(self.id) }; - let port = global.ports.get(&key, port_id); + let port = ctx.get_port(port_id); (port.peer_connector, port.peer_id, port.kind == PortKind::Putter) }; @@ -474,7 +579,7 @@ impl ConnectorPDL { // If here, then the newly generated solution is completely // compatible. - Self::submit_sync_solution(new_solution, results); + self.submit_sync_solution(new_solution, ctx, results); // Consider the next branch if branch_index == self.sync_finished_last_handled { @@ -488,76 +593,103 @@ impl ConnectorPDL { } } - // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with - // the connector itself, half managed, half accessible (a-la PublicInbox - // and PrivateInbox) - pub fn run(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling { - if self.in_sync { - let scheduling = self.run_in_speculative_mode(pd, global, results); - - // When in speculative mode we might have generated new sync - // solutions, we need to turn them into proposed solutions here. - if self.sync_finished_last_handled != self.sync_finished.last { - // Retrieve first element in queue - let mut next_id; - if self.sync_finished_last_handled == 0 { - next_id = self.sync_finished.first; - } else { - let last_handled = &self.branches[self.sync_finished_last_handled as usize]; - debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" - next_id = last_handled.next_branch_in_queue.unwrap(); - } - - loop { - let branch_id = BranchId::new(next_id); - let branch = &self.branches[next_id as usize]; - let branch_next = branch.next_branch_in_queue; - - // Turn local solution into a message and send it along - // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed - let connector_key = unsafe{ ConnectorKey::from_id(self.id) }; - let solution_message = self.generate_initial_solution_for_branch(branch_id, &connector_key, global); - if let Some(valid_solution) = solution_message { - Self::submit_sync_solution(valid_solution, results); - } else { - // Branch is actually invalid, but we only just figured - // it out. We need to mark it as invalid to prevent - // future use - Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); - if branch_id == self.sync_finished_last_handled { - self.sync_finished_last_handled = self.sync_finished.last; - } + fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + let should_propagate_message = match &self.committed_to { + Some((previous_origin, previous_comparison)) => { + // Already committed to something. So will commit to this if it + // takes precedence over the current solution + message.comparison_number > *previous_comparison || + (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_comparison.0) + }, + None => { + // Not yet committed to a solution, so commit to this one + true + } + }; - let branch = &mut self.branches[next_id as usize]; - branch.sync_state = SpeculativeState::Inconsistent; - } - - match branch_next { - Some(id) => next_id = id, - None => break, + if should_propagate_message { + self.committed_to = Some((message.connector_origin, message.comparison_number)); + + if message.to_visit.is_empty() { + // Visited all of the connectors, so every connector can now + // apply the solution + // TODO: Use temporary workspace + let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1); + for (connector_id, _) in &message.local_solutions { + if *connector_id != ctx.id { + to_visit.push(*connector_id); } } - self.sync_finished_last_handled = next_id; + message.to_visit = to_visit; + self.handle_confirm_commit_message(message.clone(), ctx, delta_state); + delta_state.outbox.push(MessageContents::ConfirmCommit(message)); + } else { + // Not yet visited all of the connectors + delta_state.outbox.push(MessageContents::RequestCommit(message)); } + } + } - return scheduling; - } else { - let scheduling = self.run_in_deterministic_mode(pd, global, results); - return scheduling; + fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + // Make sure this is the message we actually committed to. As long as + // we're running on a single machine this is fine. + // TODO: Take care of nefarious peers + let (expected_connector_id, expected_comparison_number) = + self.committed_to.unwrap(); + assert_eq!(message.connector_origin, expected_connector_id); + assert_eq!(message.comparison_number, expected_comparison_number); + + // Find the branch we're supposed to commit to + let (_, branch_id) = message.local_solutions + .iter() + .find(|(id, _)| *id == ctx.id) + .unwrap(); + let branch_id = *branch_id; + + // Commit to the branch. That is: move the solution branch to the first + // of the connector's branches + self.in_sync = false; + self.branches.swap(0, branch_id.index as usize); + self.branches.truncate(1); // TODO: Or drain and do not deallocate? + let solution = &mut self.branches[0]; + + // Clear all of the other sync-related variables + self.sync_active.clear(); + self.sync_pending_get.clear(); + self.sync_finished.clear(); + self.sync_finished_last_handled = 0; + self.cur_round += 1; + + self.committed_to = None; + self.inbox.clear(); + self.ports.commit_to_sync(); + + // Add/remove any of the ports we lost during the sync phase + for port_delta in &solution.ports_delta { + if port_delta.acquired { + self.ports.add_port(port_delta.port_id); + } else { + self.ports.remove_port(port_delta.port_id); + } } + solution.commit_to_sync(); } + // ------------------------------------------------------------------------- + // Executing connector code + // ------------------------------------------------------------------------- + /// Runs the connector in synchronous mode. Potential changes to the global /// system's state are added to the `RunDeltaState` object by the connector, /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); debug_assert!(!self.sync_active.is_empty()); - let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active); + let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active); // Run the branch to the next blocking point let mut run_context = TempCtx{}; @@ -745,7 +877,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -815,7 +947,8 @@ impl ConnectorPDL { // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming // linked lists inside of the vector of branches. - fn pop_branch(branches: &mut Vec, queue: &mut BranchQueue) -> &mut Branch { + /// Pops from front of linked-list branch queue. + fn pop_branch_from_queue(branches: &mut Vec, queue: &mut BranchQueue) -> &mut Branch { debug_assert!(queue.first != 0); let branch = &mut branches[queue.first as usize]; *queue.first = branch.next_branch_in_queue.unwrap_or(0); @@ -830,6 +963,7 @@ impl ConnectorPDL { return branch; } + /// Pushes branch at the end of the linked-list branch queue. fn push_branch_into_queue( branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId, ) { @@ -1000,7 +1134,7 @@ impl ConnectorPDL { /// Generates the initial solution for a finished sync branch. If initial /// local solution is valid, then the appropriate message is returned. /// Otherwise the initial solution is inconsistent. - fn generate_initial_solution_for_branch(&self, branch_id: BranchId, key: &ConnectorKey, global: &GlobalStore) -> Option { + fn generate_initial_solution_for_branch(&self, branch_id: BranchId, ctx: &ConnectorCtx) -> Option { // Retrieve branchg debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode let branch = &self.branches[branch_id.index as usize]; @@ -1025,7 +1159,7 @@ impl ConnectorPDL { } let initial_local_solution = SyncConnectorSolution{ - connector_id: self.id, + connector_id: ctx.id, terminating_branch_id: branch_id, execution_branch_ids: all_branch_ids, final_port_mapping: initial_solution_port_mapping, @@ -1041,17 +1175,13 @@ impl ConnectorPDL { // sender and one for the receiver, ensuring it was not used. // TODO: This will fail if a port is passed around multiple times. // maybe a special "passed along" entry in `ports_delta`. - if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) { + if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) { return None; } // Might need to check if we own the other side of the channel - let (peer_port_id, peer_connector_id) = { - let port = global.ports.get(key, port_delta.port_id); - (port.peer_id, port.peer_connector) - }; - - if !sync_message.add_or_check_constraint(peer_connector, SyncBranchConstraint::SilentPort(peer_port_id)).unwrap() { + let port = ctx.get_port(port_delta.port_id); + if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() { return None; } } @@ -1059,20 +1189,17 @@ impl ConnectorPDL { // - constraints on other components due to owned ports for port_index in 0..self.ports.num_ports() { let port_id = self.ports.get_port_id(port_index); - let port = self.ports.get_port(branch_id.index, port_index); - let (peer_port_id, peer_connector_id, is_getter) = { - let port = global.ports.get(key, port_id); - (port.peer_id, port.peer_connector, port.kind == PortKind::Getter) - }; + let port_mapping = self.ports.get_port(branch_id.index, port_index); + let port = ctx.get_port(port_id); - let constraint = if port.is_assigned { - if is_getter { - SyncBranchConstraint::BranchNumber(port.last_registered_branch_id) + let constraint = if port_mapping.is_assigned { + if port.kind == PortKind::Getter { + SyncBranchConstraint::BranchNumber(port_mapping.last_registered_branch_id) } else { - SyncBranchConstraint::PortMapping(peer_port_id, port.last_registered_branch_id) + SyncBranchConstraint::PortMapping(port.peer_id, port_mapping.last_registered_branch_id) } } else { - SyncBranchConstraint::SilentPort(peer_port_id) + SyncBranchConstraint::SilentPort(port.peer_id) }; if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() { @@ -1083,20 +1210,38 @@ impl ConnectorPDL { return Some(sync_message); } - fn submit_sync_solution(partial_solution: SyncMessage, results: &mut RunDeltaState) { + fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { if partial_solution.to_visit.is_empty() { - // Solution is completely consistent + // Solution is completely consistent. So ask everyone to commit + // TODO: Maybe another package for random? + let comparison_number: u64 = unsafe { + let mut random_array = [0u8; 8]; + getrandom::getrandom(&mut random_array); + std::mem::transmute(random_array) + }; + + let num_local = partial_solution.local_solutions.len(); + let mut full_solution = SolutionMessage{ - local_solutions: Vec::with_capacity(partial_solution.local_solutions.len()), + comparison_number, + connector_origin: ctx.id, + local_solutions: Vec::with_capacity(num_local), + to_visit: Vec::with_capacity(num_local - 1), }; + for local_solution in &partial_solution.local_solutions { full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id)); + if local_solution.connector_id != ctx.id { + full_solution.to_visit.push(local_solution.connector_id); + } } - results.outbox.push(OutgoingMessage::Solution(full_solution)); + debug_assert!(self.committed_to.is_none()); + self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number)); + results.outbox.push(MessageContents::RequestCommit(full_solution)); } else { // Still have connectors to visit - results.outbox.push(OutgoingMessage::Sync(partial_solution)); + results.outbox.push(MessageContents::Sync(partial_solution)); } } @@ -1121,7 +1266,7 @@ impl ConnectorPDL { pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. - pub outbox: Vec, + pub outbox: Vec, pub new_connectors: Vec, // Workspaces pub ports: Vec, @@ -1145,7 +1290,6 @@ pub(crate) enum ConnectorScheduling { NotNow, // Do not reschedule for running } - /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs index 191c529907db1526c6cd6337f190724e0830e2d3..018fa8e82011bd87902ced460d43bdefb7fc77ae 100644 --- a/src/runtime2/global_store.rs +++ b/src/runtime2/global_store.rs @@ -1,3 +1,7 @@ +use std::ptr; +use std::sync::{Arc, Barrier, RwLock, RwLockReadGuard}; +use std::sync::atomic::{AtomicBool, AtomicU32}; + use crate::collections::{MpmcQueue, RawVec}; use super::connector::{ConnectorPDL, ConnectorPublic}; @@ -5,13 +9,11 @@ use super::port::{PortIdLocal, Port, PortKind, PortOwnership, Channel}; use super::inbox::PublicInbox; use super::scheduler::Router; -use std::ptr; -use std::sync::{Barrier, RwLock, RwLockReadGuard}; -use std::sync::atomic::AtomicBool; use crate::ProtocolDescription; use crate::runtime2::connector::{ConnectorScheduling, RunDeltaState}; -use crate::runtime2::inbox::{DataMessage, SyncMessage}; +use crate::runtime2::inbox::{DataMessage, MessageContents, SyncMessage}; use crate::runtime2::native::Connector; +use crate::runtime2::scheduler::ConnectorCtx; /// A kind of token that, once obtained, allows mutable access to a connector. /// We're trying to use move semantics as much as possible: the owner of this @@ -39,7 +41,7 @@ impl ConnectorKey { /// A kind of token that allows shared access to a connector. Multiple threads /// may hold this #[derive(Copy, Clone)] -pub(crate) struct ConnectorId(u32); +pub(crate) struct ConnectorId(pub u32); impl ConnectorId { // TODO: Like the other `new_invalid`, maybe remove @@ -64,31 +66,25 @@ pub enum ConnectorVariant { } impl Connector for ConnectorVariant { - fn insert_data_message(&mut self, message: DataMessage) { + fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { match self { - ConnectorVariant::UserDefined(c) => c.insert_data_message(message), - ConnectorVariant::Native(c) => c.insert_data_message(message), + ConnectorVariant::UserDefined(c) => c.handle_message(message, ctx, delta_state), + ConnectorVariant::Native(c) => c.handle_message(message, ctx, delta_state), } } - fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) { + fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { match self { - ConnectorVariant::UserDefined(c) => c.insert_sync_message(message, global, delta_state), - ConnectorVariant::Native(c) => c.insert_sync_message(message, global, delta_state), - } - } - - fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling { - match self { - ConnectorVariant::UserDefined(c) => c.run(protocol_description, global, delta_state), - ConnectorVariant::Native(c) => c.run(protocol_description, global, delta_state), + ConnectorVariant::UserDefined(c) => c.run(protocol_description, ctx, delta_state), + ConnectorVariant::Native(c) => c.run(protocol_description, ctx, delta_state), } } } pub struct ScheduledConnector { - pub connector: ConnectorVariant, - pub public: ConnectorPublic, + pub connector: ConnectorVariant, // access by connector + pub context: ConnectorCtx, // mutable access by scheduler, immutable by connector + pub public: ConnectorPublic, // accessible by all schedulers and connectors pub router: Router, } @@ -97,7 +93,8 @@ pub struct ScheduledConnector { /// Otherwise one has shared access. /// /// This datastructure is built to be wrapped in a RwLock. -struct ConnectorStore { +pub(crate) struct ConnectorStore { + pub(crate) port_counter: Arc, inner: RwLock, } @@ -109,6 +106,7 @@ struct ConnectorStoreInner { impl ConnectorStore { fn with_capacity(capacity: usize) -> Self { return Self{ + port_counter: Arc::new(AtomicU32::new(0)), inner: RwLock::new(ConnectorStoreInner { connectors: RawVec::with_capacity(capacity), free: Vec::with_capacity(capacity), @@ -141,38 +139,53 @@ impl ConnectorStore { /// Create a new connector, returning the key that can be used to retrieve /// and/or queue it. - pub(crate) fn create(&self, connector: ConnectorVariant) -> ConnectorKey { - let lock = self.inner.write().unwrap(); - let connector = ScheduledConnector{ - connector, - public: ConnectorPublic::new(), - router: Router::new(), - }; + pub(crate) fn create(&self, created_by: &mut ScheduledConnector, connector: ConnectorVariant) -> ConnectorKey { + // Creation of the connector in the global store, requires a lock + { + let lock = self.inner.write().unwrap(); + let connector = ScheduledConnector { + connector, + context: ConnectorCtx::new(self.port_counter.clone()), + public: ConnectorPublic::new(), + router: Router::new(), + }; - let index; - if lock.free.is_empty() { - let connector = Box::into_raw(Box::new(connector)); + let index; + if lock.free.is_empty() { + let connector = Box::into_raw(Box::new(connector)); - unsafe { - // Cheating a bit here. Anyway, move to heap, store in list - index = lock.connectors.len(); - lock.connectors.push(connector); - } - } else { - index = lock.free.pop().unwrap(); + unsafe { + // Cheating a bit here. Anyway, move to heap, store in list + index = lock.connectors.len(); + lock.connectors.push(connector); + } + } else { + index = lock.free.pop().unwrap(); - unsafe { - let target = lock.connectors.get_mut(index); - debug_assert!(!target.is_null()); - ptr::write(*target, connector); + unsafe { + let target = lock.connectors.get_mut(index); + debug_assert!(!target.is_null()); + ptr::write(*target, connector); + } } } - // TODO: Clean up together with the trait + // Setting of new connector's ID let key = ConnectorKey{ index: index as u32 }; - let connector = self.get_mut(&key); - if let ConnectorVariant::UserDefined(connector) = &mut connector.connector { - connector.set_connector_id(key.downcast()); + let new_connector = self.get_mut(&key); + new_connector.context.id = key.downcast(); + + // Transferring ownership of ports (and crashing if there is a + // programmer's mistake in port management) + match &new_connector.connector { + ConnectorVariant::UserDefined(connector) => { + for port_id in &connector.ports.owned_ports { + let mut port = created_by.context.remove_port(*port_id); + port.owning_connector = new_connector.context.id; + new_connector.context.add_port(port); + } + }, + ConnectorVariant::Native(_) => {}, // no initial ports (yet!) } return key; @@ -204,127 +217,6 @@ impl Drop for ConnectorStore { } } -/// The registry of all ports -pub struct PortStore { - inner: RwLock, -} - -struct PortStoreInner { - ports: RawVec, - free: Vec, -} - -impl PortStore { - fn with_capacity(capacity: usize) -> Self { - Self{ - inner: RwLock::new(PortStoreInner{ - ports: RawVec::with_capacity(capacity), - free: Vec::with_capacity(capacity), - }), - } - } - - pub(crate) fn get(&self, key: &ConnectorKey, port_id: PortIdLocal) -> PortRef { - let lock = self.inner.read().unwrap(); - debug_assert!(port_id.is_valid()); - - unsafe { - let port = lock.ports.get_mut(port_id.index as usize); - let port = &mut *port; - debug_assert_eq!(port.owning_connector_id, key.index); // race condition (if they are not equal, which should never happen), better than nothing - - return PortRef{ lock, port }; - } - } - - pub(crate) fn create_channel(&self, creating_connector: ConnectorId) -> Channel { - let mut lock = self.inner.write().unwrap(); - - // Reserves a new port. Doesn't point it to its counterpart - fn reserve_port(lock: &mut std::sync::RwLockWriteGuard<'_, PortStoreInner>, kind: PortKind, creating_connector: ConnectorId) -> u32 { - let index; - - if lock.free.is_empty() { - index = lock.ports.len() as u32; - lock.ports.push(Port{ - self_id: PortIdLocal::new(index), - peer_id: PortIdLocal::new_invalid(), - kind, - ownership: PortOwnership::Owned, - owning_connector: connector_id, - peer_connector: connector_id - }); - } else { - index = lock.free.pop().unwrap() as u32; - let port = unsafe{ &mut *lock.ports.get_mut(index as usize) }; - - port.peer_id = PortIdLocal::new_invalid(); - port.kind = kind; - port.ownership = PortOwnership::Owned; - port.owning_connector = connector_id; - port.peer_connector = connector_id; - } - - return index; - } - - // Create the ports - let putter_id = reserve_port(&mut lock, PortKind::Putter, creating_connector); - let getter_id = reserve_port(&mut lock, PortKind::Getter, creating_connector); - debug_assert_ne!(putter_id, getter_id); - - // Point them to one another - unsafe { - let putter_port = &mut *lock.ports.get_mut(putter_id as usize); - let getter_port = &mut *lock.ports.get_mut(getter_id as usize); - putter_port.peer_id = getter_port.self_id; - getter_port.peer_id = putter_port.self_id; - } - - return Channel{ - putter_id: PortIdLocal::new(putter_id), - getter_id: PortIdLocal::new(getter_id), - } - } -} - -pub struct PortRef<'p> { - lock: RwLockReadGuard<'p, PortStoreInner>, - port: &'static mut Port, -} - -impl<'p> std::ops::Deref for PortRef<'p> { - type Target = Port; - - fn deref(&self) -> &Self::Target { - return self.port; - } -} - -impl<'p> std::ops::DerefMut for PortRef<'p> { - fn deref_mut(&mut self) -> &mut Self::Target { - return self.port; - } -} - -impl Drop for PortStore { - fn drop(&mut self) { - let lock = self.inner.write().unwrap(); - - // Very lazy code - for idx in 0..lock.ports.len() { - if lock.free.contains(&idx) { - continue; - } - - unsafe { - let port = lock.ports.get_mut(idx); - std::ptr::drop_in_place(port); - } - } - } -} - /// Global store of connectors, ports and queues that are used by the sceduler /// threads. The global store has the appearance of a thread-safe datatype, but /// one needs to be careful using it. @@ -335,7 +227,6 @@ impl Drop for PortStore { pub struct GlobalStore { pub connector_queue: MpmcQueue, pub connectors: ConnectorStore, - pub ports: PortStore, pub should_exit: AtomicBool, // signal threads to exit } @@ -344,7 +235,6 @@ impl GlobalStore { Self{ connector_queue: MpmcQueue::with_capacity(256), connectors: ConnectorStore::with_capacity(256), - ports: PortStore::with_capacity(256), should_exit: AtomicBool::new(false), } } diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index db5c94535cae7f96ccc86ca0bc81faa4b00df51c..fbc1130d314a9db50de959332eee57d02652bef8 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -20,29 +20,11 @@ use crate::protocol::eval::ValueGroup; use super::connector::{BranchId, PortIdLocal}; use super::global_store::ConnectorId; -/// A message prepared by a connector. Waiting to be picked up by the runtime to -/// be sent to another connector. -#[derive(Clone)] -pub struct OutgoingDataMessage { - pub sending_port: PortIdLocal, - pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id - pub sender_cur_branch_id: BranchId, // always valid - pub message: ValueGroup, -} - -pub enum OutgoingMessage { - Data(OutgoingDataMessage), - Sync(SyncMessage), - Solution(SolutionMessage), -} - /// A message that has been delivered (after being imbued with the receiving /// port by the scheduler) to a connector. #[derive(Clone)] pub struct DataMessage { - pub sending_connector: ConnectorId, pub sending_port: PortIdLocal, - pub receiving_port: PortIdLocal, pub sender_prev_branch_id: BranchId, pub sender_cur_branch_id: BranchId, pub message: ValueGroup, @@ -186,14 +168,16 @@ impl SyncMessage { } pub struct SolutionMessage { + pub comparison_number: u64, + pub connector_origin: ConnectorId, pub local_solutions: Vec<(ConnectorId, BranchId)>, + pub to_visit: Vec, } /// A control message. These might be sent by the scheduler to notify eachother /// of asynchronous state changes. pub struct ControlMessage { pub id: u32, // generic identifier, used to match request to response - pub sender: ConnectorId, pub content: ControlMessageVariant, } @@ -202,15 +186,21 @@ pub enum ControlMessageVariant { Ack, // acknowledgement of previous control message, matching occurs through control message ID. } -/// Generic message in the `PublicInbox`, handled by the scheduler (which takes -/// out and handles all control message and potential routing). The correctly -/// addressed `Data` variants will end up at the connector. -pub enum Message { - Data(DataMessage), // data message, handled by connector - Sync(SyncMessage), // sync message, handled by both connector/scheduler - Solution(SolutionMessage), // solution message, finishing a sync round - Control(ControlMessage), // control message, handled by scheduler - Ping, // ping message, intentionally waking up a connector (used for native connectors) +/// Generic message contents. +#[derive(Clone)] +pub enum MessageContents { + Data(DataMessage), // data message, handled by connector + Sync(SyncMessage), // sync message, handled by both connector/scheduler + RequestCommit(SolutionMessage), // solution message, requesting participants to commit + ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to + Control(ControlMessage), // control message, handled by scheduler + Ping, // ping message, intentionally waking up a connector (used for native connectors) +} + +pub struct Message { + pub sending_connector: ConnectorId, + pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector) + pub contents: MessageContents, } /// The public inbox of a connector. The thread running the connector that owns diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 61c761bcc1fb52194a9abfe63be452b6aee1837a..b0de3c54ae700703f8812d335c2397c0075f3269 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -7,6 +7,9 @@ use crate::protocol::eval::ValueGroup; use crate::ProtocolDescription; use crate::runtime2::connector::{Branch, find_ports_in_value_group}; use crate::runtime2::global_store::{ConnectorKey, GlobalStore}; +use crate::runtime2::inbox::MessageContents; +use crate::runtime2::port::{Port, PortKind}; +use crate::runtime2::scheduler::ConnectorCtx; use super::RuntimeInner; use super::global_store::{ConnectorVariant, ConnectorId}; @@ -14,14 +17,19 @@ use super::port::{Channel, PortIdLocal}; use super::connector::{ConnectorPDL, ConnectorScheduling, RunDeltaState}; use super::inbox::{Message, DataMessage, SyncMessage}; +/// Generic connector interface from the scheduler's point of view. pub trait Connector { - fn insert_data_message(&mut self, message: DataMessage); - fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState); - fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling; + /// Handle a new message (preprocessed by the scheduler). You probably only + /// want to handle `Data`, `Sync`, and `Solution` messages. The others are + /// intended for the scheduler itself. + fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState); + + /// Should run the connector's behaviour up until the next blocking point. + fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; -type JobQueue = Arc>>, +type JobQueue = Arc>>; enum ApplicationJob { NewConnector(ConnectorPDL), @@ -47,15 +55,11 @@ impl ConnectorApplication { } impl Connector for ConnectorApplication { - fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, delta_state: &mut RunDeltaState) { - todo!("handling sync messages in ApplicationConnector"); - } - - fn insert_data_message(&mut self, message: DataMessage) { - todo!("handling messages in ApplicationConnector"); + fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + todo!("handling messages in ConnectorApplication (API for runtime)") } - fn run(&mut self, protocol_description: &ProtocolDescription, global: &GlobalStore, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + fn run(&mut self, protocol_description: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop() { match job { @@ -90,9 +94,18 @@ impl ApplicationInterface { /// Creates a new channel. pub fn create_channel(&mut self) -> Channel { - let channel = self.runtime.global_store.ports.create_channel(self.connector_id); - self.owned_ports.push(channel.putter_id); - self.owned_ports.push(channel.getter_id); + // TODO: Duplicated logic in scheduler + let getter_id = self.runtime.global_store.connectors.port_counter.fetch_add(2, Ordering::SeqCst); + let putter_id = PortIdLocal::new(getter_id + 1); + let getter_id = PortIdLocal::new(getter_id); + + self.ports.push(Port{ + self_id: getter_id, + peer_id: putter_id, + kind: PortKind::Getter, + owning_connector: self.connector_id, + peer_connector: self.connector_id, + }); return channel; } diff --git a/src/runtime2/port.rs b/src/runtime2/port.rs index 43e9d40a5d743309ee20a44059ab0340b91f4ddd..25cd6b3d52b59af1fcb6f0f3969b210e9dbd363a 100644 --- a/src/runtime2/port.rs +++ b/src/runtime2/port.rs @@ -1,6 +1,6 @@ use super::global_store::ConnectorId; -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct PortIdLocal { pub index: u32, } @@ -26,22 +26,13 @@ pub enum PortKind { Getter, } -pub enum PortOwnership { - Unowned, // i.e. held by a native application - Owned, - InTransit, -} - /// Represents a port inside of the runtime. May be without owner if it is /// created by the application interfacing with the runtime, instead of being /// created by a connector. pub struct Port { - // Once created, these values are immutable pub self_id: PortIdLocal, pub peer_id: PortIdLocal, pub kind: PortKind, - // But this can be changed, but only by the connector that owns it - pub ownership: PortOwnership, pub owning_connector: ConnectorId, pub peer_connector: ConnectorId, // might be temporarily inconsistent while peer port is sent around in non-sync phase. } diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e1f59e016d3faae5850be6fe58a89953f32971a2..efbc77c9238a171452b8f6c5708f021ae5350bff 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,19 +1,95 @@ use std::sync::Arc; use std::sync::Condvar; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use std::thread; use crate::ProtocolDescription; use crate::runtime2::global_store::ConnectorVariant; +use crate::runtime2::inbox::MessageContents; use crate::runtime2::native::Connector; +use crate::runtime2::port::{Channel, PortKind, PortOwnership}; use super::RuntimeInner; -use super::port::{PortIdLocal}; +use super::port::{Port, PortIdLocal}; use super::inbox::{Message, DataMessage, ControlMessage, ControlMessageVariant}; use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; use super::global_store::{ConnectorKey, ConnectorId, GlobalStore}; +/// Contains fields that are mostly managed by the scheduler, but may be +/// accessed by the connector +pub(crate) struct ConnectorCtx { + pub(crate) id: ConnectorId, + port_counter: Arc, + pub(crate) ports: Vec, +} + +impl ConnectorCtx { + pub(crate) fn new(port_counter: Arc) -> ConnectorCtx { + Self{ + id: ConnectorId::new_invalid(), + port_counter, + ports: Vec::new(), + } + } + + /// Creates a (putter, getter) port pair belonging to the same channel. The + /// port will be implicitly owned by the connector. + pub(crate) fn create_channel(&mut self) -> Channel { + let getter_id = self.port_counter.fetch_add(2, Ordering::SeqCst); + let putter_id = PortIdLocal::new(getter_id + 1); + let getter_id = PortIdLocal::new(getter_id); + + self.ports.push(Port{ + self_id: getter_id, + peer_id: putter_id, + kind: PortKind::Getter, + owning_connector: self.id, + peer_connector: self.id, + }); + + self.ports.push(Port{ + self_id: putter_id, + peer_id: getter_id, + kind: PortKind::Putter, + owning_connector: self.id, + peer_connector: self.id, + }); + + return Channel{ getter_id, putter_id }; + } + + pub(crate) fn add_port(&mut self, port: Port) { + debug_assert!(!self.ports.iter().any(|v| v.self_id == port.self_id)); + self.ports.push(port); + } + + pub(crate) fn remove_port(&mut self, id: PortIdLocal) -> Port { + let index = self.port_id_to_index(id); + return self.ports.remove(index); + } + + pub(crate) fn get_port(&self, id: PortIdLocal) -> &Port { + let index = self.port_id_to_index(id); + return &self.ports[index]; + } + + pub(crate) fn get_port_mut(&mut self, id: PortIdLocal) -> &mut Port { + let index = self.port_id_to_index(id); + return &mut self.ports[index]; + } + + fn port_id_to_index(&self, id: PortIdLocal) -> usize { + for (idx, port) in self.ports.iter().enumerate() { + if port.self_id == id { + return idx; + } + } + + panic!("port {:?}, not owned by connector", id); + } +} + pub(crate) struct Scheduler { runtime: Arc, } @@ -57,76 +133,64 @@ impl Scheduler { while cur_schedule == ConnectorScheduling::Immediate { // Check all the message that are in the shared inbox while let Some(message) = scheduled.public.inbox.take_message() { - // TODO: Put header in front of messages, this is a mess - match message { - Message::Data(message) => { + match message.contents { + MessageContents::Data(content) => { // Check if we need to reroute, or can just put it // in the private inbox of the connector - if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, message.sending_port) { - self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(message)); + if let Some(other_connector_id) = scheduled.router.should_reroute(message.sending_connector, content.sending_port) { + self.send_message_and_wake_up_if_sleeping(other_connector_id, Message::Data(content)); } else { - scheduled.connector.insert_data_message(message); + scheduled.connector.insert_data_message(content); } + } + MessageContents::Sync(content) => { + scheduled.connector.insert_sync_message(content, &scheduled.context, &mut delta_state); + } + MessageContents::Solution(content) => { + // TODO: Handle solution message }, - Message::Sync(message) => { - // TODO: Come back here after rewriting port ownership stuff - if let Some(other_connector_id) = scheduled.router.should_reroute() - }, - Message::Solution(solution) => { - - }, - Message::Control(message) => { - match message.content { + MessageContents::Control(content) => { + match content.content { ControlMessageVariant::ChangePortPeer(port_id, new_target_connector_id) => { // Need to change port target - let port = self.runtime.global_store.ports.get(&connector_key, port_id); + let port = scheduled.context.get_port_mut(port_id); port.peer_connector = new_target_connector_id; debug_assert!(delta_state.outbox.is_empty()); // And respond with an Ack + // Note: after this code has been reached, we may not have any + // messages in the outbox that send to the port whose owning + // connector we just changed. This is because the `ack` will + // clear the rerouting entry of the `ack`-receiver. self.send_message_and_wake_up_if_sleeping( - message.sender, - Message::Control(ControlMessage{ - id: message.id, - sender: connector_key.downcast(), - content: ControlMessageVariant::Ack, - }) + content.sender, + Message{ + sending_connector: connector_key.downcast(), + receiving_port: PortIdLocal::new_invalid(), + contents: MessageContents::Control(ControlMessage{ + id: content.id, + content: ControlMessageVariant::Ack, + }), + } ); }, ControlMessageVariant::Ack => { - scheduled.router.handle_ack(message.id); + scheduled.router.handle_ack(content.id); } } - }, + } Message::Ping => {}, } } // Actually run the connector - // TODO: Revise - let new_schedule; - match &mut scheduled.connector { - ConnectorVariant::UserDefined(connector) => { - if connector.is_in_sync_mode() { - // In synchronous mode, so we can expect messages being sent, - // but we never expect the creation of connectors - new_schedule = connector.run_in_speculative_mode(&self.runtime.protocol_description, &mut delta_state); - debug_assert!(delta_state.new_connectors.is_empty()); - } else { - // In regular running mode (not in a sync block) we cannot send - // messages but we can create new connectors - new_schedule = connector.run_in_deterministic_mode(&self.runtime.protocol_description, &mut delta_state); - debug_assert!(delta_state.outbox.is_empty()); - } - }, - ConnectorVariant::Native(connector) => { - new_schedule = connector.run(&self.runtime.protocol_description); - }, - } + let new_schedule = scheduled.connector.run( + &self.runtime.protocol_description, &scheduled.context, &mut delta_state + ); // Handle all of the output from the current run: messages to // send and connectors to instantiate. - self.handle_delta_state(&connector_key, &mut delta_state); + self.handle_delta_state(&connector_key, &mut scheduled.context, &mut delta_state); cur_schedule = new_schedule; } @@ -164,26 +228,54 @@ impl Scheduler { } } - fn handle_delta_state(&mut self, connector_key: &ConnectorKey, delta_state: &mut RunDeltaState) { + fn handle_delta_state(&mut self, connector_key: &ConnectorKey, context: &mut ConnectorCtx, delta_state: &mut RunDeltaState) { // Handling any messages that were sent + let connector_id = connector_key.downcast(); + if !delta_state.outbox.is_empty() { - for message in delta_state.outbox.drain(..) { - let (inbox_message, target_connector_id) = { - let sending_port = self.runtime.global_store.ports.get(&connector_key, message.sending_port); - ( - DataMessage { - sending_connector: connector_key.downcast(), - sending_port: sending_port.self_id, - receiving_port: sending_port.peer_id, - sender_prev_branch_id: message.sender_prev_branch_id, - sender_cur_branch_id: message.sender_cur_branch_id, - message: message.message, - }, - sending_port.peer_connector, - ) + for mut message in delta_state.outbox.drain(..) { + // Based on the message contents, decide where the message + // should be sent to. This might end up modifying the message. + let (peer_connector, peer_port) = match &mut message { + MessageContents::Data(contents) => { + let port = context.get_port(contents.sending_port); + (port.peer_connector, port.peer_id) + }, + MessageContents::Sync(contents) => { + let connector = contents.to_visit.pop().unwrap(); + (connector, PortIdLocal::new_invalid()) + }, + MessageContents::RequestCommit(contents)=> { + let connector = contents.to_visit.pop().unwrap(); + (connector, PortIdLocal::new_invalid()) + }, + MessageContents::ConfirmCommit(contents) => { + for to_visit in &contents.to_visit { + let message = Message{ + sending_connector: connector_id, + receiving_port: PortIdLocal::new_invalid(), + contents: contents.clone(), + }; + self.send_message_and_wake_up_if_sleeping(*to_visit, message); + } + (ConnectorId::new_invalid(), PortIdLocal::new_invalid()) + }, + MessageContents::Control(_) | MessageContents::Ping => { + // Never generated by the user's code + unreachable!(); + } }; - self.send_message_and_wake_up_if_sleeping(target_connector_id, Message::Data(inbox_message)); + // TODO: Maybe clean this up, perhaps special case for + // ConfirmCommit can be handled differently. + if peer_connector.is_valid() { + let message = Message { + sending_connector: connector_id, + receiving_port: peer_port, + contents: message, + }; + self.send_message_and_wake_up_if_sleeping(peer_connector, message); + } } } @@ -194,25 +286,16 @@ impl Scheduler { for new_connector in delta_state.new_connectors.drain(..) { // Add to global registry to obtain key - let new_key = self.runtime.global_store.connectors.create(ConnectorVariant::UserDefined(new_connector)); + let new_key = self.runtime.global_store.connectors.create(cur_connector, ConnectorVariant::UserDefined(new_connector)); let new_connector = self.runtime.global_store.connectors.get_mut(&new_key); - // Each port should be lost by the connector that created the - // new one. Note that the creator is the current owner. - for port_id in &new_connector.ports.owned_ports { - debug_assert!(!cur_connector.ports.owned_ports.contains(port_id)); - - // Modify ownership, retrieve peer connector - let (peer_connector_id, peer_port_id) = { - let mut port = self.runtime.global_store.ports.get(connector_key, *port_id); - port.owning_connector = new_key.downcast(); - - (port.peer_connector, port.peer_id) - }; - - // Send message that port has changed ownership + // Call above changed ownership of ports, but we still have to + // let the other end of the channel know that the port has + // changed location. + for port in &new_connector.context.ports { let reroute_message = cur_connector.router.prepare_reroute( - port_id, peer_port_id, connector_key.downcast(), peer_connector_id, new_key.downcast() + port.self_id, port.peer_id, cur_connector.context.id, + port.peer_connector, new_connector.context.id ); self.send_message_and_wake_up_if_sleeping(peer_connector_id, reroute_message); @@ -282,7 +365,6 @@ impl Router { return Message::Control(ControlMessage{ id, - sender: self_connector_id, content: ControlMessageVariant::ChangePortPeer(peer_port_id, new_owner_connector_id) }); }