diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 0cd20a6adf9fa855d54512e54e2b32c76eb19717..22e82638e9e0b1d5468aae9428b89ec9c63248ec 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -5,9 +5,11 @@ use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::runtime2::global_store::{ConnectorKey, GlobalStore}; -use crate::runtime2::inbox::{OutgoingMessage, SolutionMessage}; +use crate::runtime2::global_store::ConnectorKey; +use crate::runtime2::inbox::{MessageContents, OutgoingMessage, SolutionMessage}; +use crate::runtime2::native::Connector; use crate::runtime2::port::PortKind; +use crate::runtime2::scheduler::ConnectorCtx; use super::global_store::ConnectorId; use super::inbox::{ PrivateInbox, PublicInbox, OutgoingDataMessage, DataMessage, SyncMessage, @@ -97,6 +99,15 @@ impl Branch { ports_delta: parent_branch.ports_delta.clone(), } } + + fn commit_to_sync(&mut self) { + self.index = BranchId::new(0); + self.parent_index = BranchId::new_invalid(); + self.sync_state = SpeculativeState::RunningNonSync; + self.next_branch_in_queue = None; + self.received.clear(); + self.ports_delta.clear(); + } } #[derive(Clone)] @@ -141,8 +152,7 @@ enum PortOwnershipError { AlreadyGivenAway(PortIdLocal) } -/// As the name implies, this contains a description of the ports associated -/// with a connector. +/// Contains a description of the port mapping during a particular sync session. /// TODO: Extend documentation pub(crate) struct ConnectorPorts { // Essentially a mapping from `port_index` to `port_id`. @@ -184,6 +194,24 @@ impl ConnectorPorts { } } + /// Adds a new port. Caller must make sure that the connector is not in the + /// sync phase. + fn add_port(&mut self, port_id: PortIdLocal) { + debug_assert!(self.port_mapping.len() == self.owned_ports.len()); + debug_assert!(!self.owned_ports.contains(&port_id)); + self.owned_ports.push(port_id); + self.port_mapping.push(PortAssignment::new_unassigned()); + } + + /// Commits to a particular branch. Essentially just removes the port + /// mapping information generated during the sync phase. + fn commit_to_sync(&mut self) { + self.port_mapping.truncate(self.owned_ports.len()); + debug_assert!(self.port_mapping.iter().all(|v| { + !v.is_assigned && !v.last_registered_branch_id.is_valid() + })); + } + /// Removes a particular port from the connector. May only be done if the /// connector is in non-sync mode fn remove_port(&mut self, port_id: PortIdLocal) { @@ -251,14 +279,22 @@ struct BranchQueue { } impl BranchQueue { + #[inline] fn new() -> Self { Self{ first: 0, last: 0 } } + #[inline] fn is_empty(&self) -> bool { debug_assert!((self.first == 0) == (self.last == 0)); return self.first == 0; } + + #[inline] + fn clear(&mut self) { + self.first = 0; + self.last = 0; + } } /// Public fields of the connector that can be freely shared between multiple @@ -281,7 +317,6 @@ impl ConnectorPublic { // TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct ConnectorPDL { // State and properties of connector itself - id: ConnectorId, in_sync: bool, // Branch management branches: Vec, // first branch is always non-speculative one @@ -289,7 +324,9 @@ pub(crate) struct ConnectorPDL { sync_pending_get: BranchQueue, sync_finished: BranchQueue, sync_finished_last_handled: u32, + cur_round: u32, // Port/message management + pub committed_to: Option<(ConnectorId, u64)>, pub inbox: PrivateInbox, pub ports: ConnectorPorts, } @@ -313,43 +350,112 @@ impl RunContext for TempCtx { } } +impl Connector for ConnectorPDL { + fn handle_message(&mut self, message: MessageContents, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + use MessageContents as MC; + + match message { + MC::Data(message) => self.handle_data_message(message), + MC::Sync(message) => self.handle_sync_message(message, ctx, delta_state), + MC::RequestCommit(message) => self.handle_request_commit_message(message, ctx, delta_state), + MC::ConfirmCommit(message) => self.handle_confirm_commit_message(message, ctx, delta_state), + MC::Control(_) | MC::Ping => {}, + } + } + + fn run(&mut self, pd: &ProtocolDescription, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { + if self.in_sync { + let scheduling = self.run_in_speculative_mode(pd, ctx, results); + + // When in speculative mode we might have generated new sync + // solutions, we need to turn them into proposed solutions here. + if self.sync_finished_last_handled != self.sync_finished.last { + // Retrieve first element in queue + let mut next_id; + if self.sync_finished_last_handled == 0 { + next_id = self.sync_finished.first; + } else { + let last_handled = &self.branches[self.sync_finished_last_handled as usize]; + debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" + next_id = last_handled.next_branch_in_queue.unwrap(); + } + + loop { + let branch_id = BranchId::new(next_id); + let branch = &self.branches[next_id as usize]; + let branch_next = branch.next_branch_in_queue; + + // Turn local solution into a message and send it along + // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed + let solution_message = self.generate_initial_solution_for_branch(branch_id, ctx); + if let Some(valid_solution) = solution_message { + self.submit_sync_solution(valid_solution, ctx, results); + } else { + // Branch is actually invalid, but we only just figured + // it out. We need to mark it as invalid to prevent + // future use + Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); + if branch_id == self.sync_finished_last_handled { + self.sync_finished_last_handled = self.sync_finished.last; + } + + let branch = &mut self.branches[next_id as usize]; + branch.sync_state = SpeculativeState::Inconsistent; + } + + match branch_next { + Some(id) => next_id = id, + None => break, + } + } + + self.sync_finished_last_handled = next_id; + } + + return scheduling; + } else { + let scheduling = self.run_in_deterministic_mode(pd, ctx, results); + return scheduling; + } + } +} + impl ConnectorPDL { /// Constructs a representation of a connector. The assumption is that the /// initial branch is at the first instruction of the connector's code, - /// hence is in a non-sync state. Note that the initial ID is invalid, we - /// assume the connector will get inserted into the runtime, there it will - /// receive its ID. + /// hence is in a non-sync state. pub fn new(initial_branch: Branch, owned_ports: Vec) -> Self { Self{ - id: ConnectorId::new_invalid(), in_sync: false, branches: vec![initial_branch], sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), sync_finished_last_handled: 0, // none at all + cur_round: 0, + committed_to: None, inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), } } - pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { - debug_assert!(!self.id.is_valid()); // ID should only be set once - self.id = id; - } - pub fn is_in_sync_mode(&self) -> bool { return self.in_sync; } - pub fn insert_data_message(&mut self, message: DataMessage) { + // ------------------------------------------------------------------------- + // Handling connector messages + // ------------------------------------------------------------------------- + + #[inline] + pub fn handle_data_message(&mut self, message: DataMessage) { self.inbox.insert_message(message); } /// Accepts a synchronous message and combines it with the locally stored - /// solution(s). - pub fn insert_sync_message(&mut self, message: SyncMessage, global: &GlobalStore, results: &mut RunDeltaState) { - debug_assert!(!message.to_visit.contains(&self.id)); // own ID already removed + /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate. + pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { + debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed debug_assert!(message.constraints.iter().any(|v| v.connector_id == self.id)); // we have constraints // TODO: Optimize, use some kind of temp workspace vector @@ -359,7 +465,7 @@ impl ConnectorPDL { // We have some solutions to match against let constraints_index = message.constraints .iter() - .position(|v| v.connector_id == self.id) + .position(|v| v.connector_id == ctx.id) .unwrap(); let constraints = &message.constraints[constraints_index].constraints; debug_assert!(!constraints.is_empty()); @@ -437,7 +543,7 @@ impl ConnectorPDL { // - replace constraints with a local solution new_solution.constraints.remove(constraints_index); new_solution.local_solutions.push(SyncConnectorSolution{ - connector_id: self.id, + connector_id: ctx.id, terminating_branch_id: BranchId::new(branch_index), execution_branch_ids: execution_path_branch_ids.clone(), final_port_mapping: new_solution_mapping, @@ -449,8 +555,7 @@ impl ConnectorPDL { let port_id = self.ports.get_port_id(port_index); let (peer_connector_id, peer_port_id, peer_is_getter) = { - let key = unsafe{ ConnectorKey::from_id(self.id) }; - let port = global.ports.get(&key, port_id); + let port = ctx.get_port(port_id); (port.peer_connector, port.peer_id, port.kind == PortKind::Putter) }; @@ -474,7 +579,7 @@ impl ConnectorPDL { // If here, then the newly generated solution is completely // compatible. - Self::submit_sync_solution(new_solution, results); + self.submit_sync_solution(new_solution, ctx, results); // Consider the next branch if branch_index == self.sync_finished_last_handled { @@ -488,76 +593,103 @@ impl ConnectorPDL { } } - // TODO: Remove GlobalStore, is used to retrieve ports. Ports belong with - // the connector itself, half managed, half accessible (a-la PublicInbox - // and PrivateInbox) - pub fn run(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling { - if self.in_sync { - let scheduling = self.run_in_speculative_mode(pd, global, results); - - // When in speculative mode we might have generated new sync - // solutions, we need to turn them into proposed solutions here. - if self.sync_finished_last_handled != self.sync_finished.last { - // Retrieve first element in queue - let mut next_id; - if self.sync_finished_last_handled == 0 { - next_id = self.sync_finished.first; - } else { - let last_handled = &self.branches[self.sync_finished_last_handled as usize]; - debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" - next_id = last_handled.next_branch_in_queue.unwrap(); - } - - loop { - let branch_id = BranchId::new(next_id); - let branch = &self.branches[next_id as usize]; - let branch_next = branch.next_branch_in_queue; - - // Turn local solution into a message and send it along - // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed - let connector_key = unsafe{ ConnectorKey::from_id(self.id) }; - let solution_message = self.generate_initial_solution_for_branch(branch_id, &connector_key, global); - if let Some(valid_solution) = solution_message { - Self::submit_sync_solution(valid_solution, results); - } else { - // Branch is actually invalid, but we only just figured - // it out. We need to mark it as invalid to prevent - // future use - Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); - if branch_id == self.sync_finished_last_handled { - self.sync_finished_last_handled = self.sync_finished.last; - } + fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + let should_propagate_message = match &self.committed_to { + Some((previous_origin, previous_comparison)) => { + // Already committed to something. So will commit to this if it + // takes precedence over the current solution + message.comparison_number > *previous_comparison || + (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_comparison.0) + }, + None => { + // Not yet committed to a solution, so commit to this one + true + } + }; - let branch = &mut self.branches[next_id as usize]; - branch.sync_state = SpeculativeState::Inconsistent; - } - - match branch_next { - Some(id) => next_id = id, - None => break, + if should_propagate_message { + self.committed_to = Some((message.connector_origin, message.comparison_number)); + + if message.to_visit.is_empty() { + // Visited all of the connectors, so every connector can now + // apply the solution + // TODO: Use temporary workspace + let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1); + for (connector_id, _) in &message.local_solutions { + if *connector_id != ctx.id { + to_visit.push(*connector_id); } } - self.sync_finished_last_handled = next_id; + message.to_visit = to_visit; + self.handle_confirm_commit_message(message.clone(), ctx, delta_state); + delta_state.outbox.push(MessageContents::ConfirmCommit(message)); + } else { + // Not yet visited all of the connectors + delta_state.outbox.push(MessageContents::RequestCommit(message)); } + } + } - return scheduling; - } else { - let scheduling = self.run_in_deterministic_mode(pd, global, results); - return scheduling; + fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { + // Make sure this is the message we actually committed to. As long as + // we're running on a single machine this is fine. + // TODO: Take care of nefarious peers + let (expected_connector_id, expected_comparison_number) = + self.committed_to.unwrap(); + assert_eq!(message.connector_origin, expected_connector_id); + assert_eq!(message.comparison_number, expected_comparison_number); + + // Find the branch we're supposed to commit to + let (_, branch_id) = message.local_solutions + .iter() + .find(|(id, _)| *id == ctx.id) + .unwrap(); + let branch_id = *branch_id; + + // Commit to the branch. That is: move the solution branch to the first + // of the connector's branches + self.in_sync = false; + self.branches.swap(0, branch_id.index as usize); + self.branches.truncate(1); // TODO: Or drain and do not deallocate? + let solution = &mut self.branches[0]; + + // Clear all of the other sync-related variables + self.sync_active.clear(); + self.sync_pending_get.clear(); + self.sync_finished.clear(); + self.sync_finished_last_handled = 0; + self.cur_round += 1; + + self.committed_to = None; + self.inbox.clear(); + self.ports.commit_to_sync(); + + // Add/remove any of the ports we lost during the sync phase + for port_delta in &solution.ports_delta { + if port_delta.acquired { + self.ports.add_port(port_delta.port_id); + } else { + self.ports.remove_port(port_delta.port_id); + } } + solution.commit_to_sync(); } + // ------------------------------------------------------------------------- + // Executing connector code + // ------------------------------------------------------------------------- + /// Runs the connector in synchronous mode. Potential changes to the global /// system's state are added to the `RunDeltaState` object by the connector, /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_speculative_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); debug_assert!(!self.sync_active.is_empty()); - let branch = Self::pop_branch(&mut self.branches, &mut self.sync_active); + let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active); // Run the branch to the next blocking point let mut run_context = TempCtx{}; @@ -745,7 +877,7 @@ impl ConnectorPDL { } /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, global: &GlobalStore, results: &mut RunDeltaState) -> ConnectorScheduling { + pub fn run_in_deterministic_mode(&mut self, pd: &ProtocolDescription, context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); @@ -815,7 +947,8 @@ impl ConnectorPDL { // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming // linked lists inside of the vector of branches. - fn pop_branch(branches: &mut Vec, queue: &mut BranchQueue) -> &mut Branch { + /// Pops from front of linked-list branch queue. + fn pop_branch_from_queue(branches: &mut Vec, queue: &mut BranchQueue) -> &mut Branch { debug_assert!(queue.first != 0); let branch = &mut branches[queue.first as usize]; *queue.first = branch.next_branch_in_queue.unwrap_or(0); @@ -830,6 +963,7 @@ impl ConnectorPDL { return branch; } + /// Pushes branch at the end of the linked-list branch queue. fn push_branch_into_queue( branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId, ) { @@ -1000,7 +1134,7 @@ impl ConnectorPDL { /// Generates the initial solution for a finished sync branch. If initial /// local solution is valid, then the appropriate message is returned. /// Otherwise the initial solution is inconsistent. - fn generate_initial_solution_for_branch(&self, branch_id: BranchId, key: &ConnectorKey, global: &GlobalStore) -> Option { + fn generate_initial_solution_for_branch(&self, branch_id: BranchId, ctx: &ConnectorCtx) -> Option { // Retrieve branchg debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode let branch = &self.branches[branch_id.index as usize]; @@ -1025,7 +1159,7 @@ impl ConnectorPDL { } let initial_local_solution = SyncConnectorSolution{ - connector_id: self.id, + connector_id: ctx.id, terminating_branch_id: branch_id, execution_branch_ids: all_branch_ids, final_port_mapping: initial_solution_port_mapping, @@ -1041,17 +1175,13 @@ impl ConnectorPDL { // sender and one for the receiver, ensuring it was not used. // TODO: This will fail if a port is passed around multiple times. // maybe a special "passed along" entry in `ports_delta`. - if !sync_message.check_constraint(self.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) { + if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)) { return None; } // Might need to check if we own the other side of the channel - let (peer_port_id, peer_connector_id) = { - let port = global.ports.get(key, port_delta.port_id); - (port.peer_id, port.peer_connector) - }; - - if !sync_message.add_or_check_constraint(peer_connector, SyncBranchConstraint::SilentPort(peer_port_id)).unwrap() { + let port = ctx.get_port(port_delta.port_id); + if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() { return None; } } @@ -1059,20 +1189,17 @@ impl ConnectorPDL { // - constraints on other components due to owned ports for port_index in 0..self.ports.num_ports() { let port_id = self.ports.get_port_id(port_index); - let port = self.ports.get_port(branch_id.index, port_index); - let (peer_port_id, peer_connector_id, is_getter) = { - let port = global.ports.get(key, port_id); - (port.peer_id, port.peer_connector, port.kind == PortKind::Getter) - }; + let port_mapping = self.ports.get_port(branch_id.index, port_index); + let port = ctx.get_port(port_id); - let constraint = if port.is_assigned { - if is_getter { - SyncBranchConstraint::BranchNumber(port.last_registered_branch_id) + let constraint = if port_mapping.is_assigned { + if port.kind == PortKind::Getter { + SyncBranchConstraint::BranchNumber(port_mapping.last_registered_branch_id) } else { - SyncBranchConstraint::PortMapping(peer_port_id, port.last_registered_branch_id) + SyncBranchConstraint::PortMapping(port.peer_id, port_mapping.last_registered_branch_id) } } else { - SyncBranchConstraint::SilentPort(peer_port_id) + SyncBranchConstraint::SilentPort(port.peer_id) }; if !sync_message.add_or_check_constraint(peer_connector_id, constraint).unwrap() { @@ -1083,20 +1210,38 @@ impl ConnectorPDL { return Some(sync_message); } - fn submit_sync_solution(partial_solution: SyncMessage, results: &mut RunDeltaState) { + fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { if partial_solution.to_visit.is_empty() { - // Solution is completely consistent + // Solution is completely consistent. So ask everyone to commit + // TODO: Maybe another package for random? + let comparison_number: u64 = unsafe { + let mut random_array = [0u8; 8]; + getrandom::getrandom(&mut random_array); + std::mem::transmute(random_array) + }; + + let num_local = partial_solution.local_solutions.len(); + let mut full_solution = SolutionMessage{ - local_solutions: Vec::with_capacity(partial_solution.local_solutions.len()), + comparison_number, + connector_origin: ctx.id, + local_solutions: Vec::with_capacity(num_local), + to_visit: Vec::with_capacity(num_local - 1), }; + for local_solution in &partial_solution.local_solutions { full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id)); + if local_solution.connector_id != ctx.id { + full_solution.to_visit.push(local_solution.connector_id); + } } - results.outbox.push(OutgoingMessage::Solution(full_solution)); + debug_assert!(self.committed_to.is_none()); + self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number)); + results.outbox.push(MessageContents::RequestCommit(full_solution)); } else { // Still have connectors to visit - results.outbox.push(OutgoingMessage::Sync(partial_solution)); + results.outbox.push(MessageContents::Sync(partial_solution)); } } @@ -1121,7 +1266,7 @@ impl ConnectorPDL { pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. - pub outbox: Vec, + pub outbox: Vec, pub new_connectors: Vec, // Workspaces pub ports: Vec, @@ -1145,7 +1290,6 @@ pub(crate) enum ConnectorScheduling { NotNow, // Do not reschedule for running } - /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) {