use std::collections::HashMap; use std::sync::atomic::AtomicBool; use crate::{PortId, ProtocolDescription}; use crate::protocol::{ComponentState, RunContext, RunResult}; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::runtime2::scheduler::Scheduler; use super::ConnectorId; use super::native::Connector; use super::scheduler::{SchedulerCtx, ConnectorCtx}; use super::inbox::{ PrivateInbox, PublicInbox, DataMessage, SyncMessage, SolutionMessage, Message, MessageContents, SyncBranchConstraint, SyncConnectorSolution }; use super::port::{Port, PortKind, PortIdLocal}; /// Represents the identifier of a branch (the index within its container). An /// ID of `0` generally means "no branch" (e.g. no parent, or a port did not /// yet receive anything from any branch). // TODO: Remove Debug derive #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct BranchId { pub index: u32, } impl BranchId { fn new_invalid() -> Self { Self{ index: 0 } } fn new(index: u32) -> Self { debug_assert!(index != 0); Self{ index } } #[inline] pub(crate) fn is_valid(&self) -> bool { return self.index != 0; } } #[derive(Debug, PartialEq, Eq)] pub(crate) enum SpeculativeState { // Non-synchronous variants RunningNonSync, // regular execution of code Error, // encountered a runtime error Finished, // finished executing connector's code // Synchronous variants RunningInSync, // running within a sync block HaltedAtBranchPoint, // at a branching point (at a `get` call) ReachedSyncEnd, // reached end of sync block, branch represents a local solution Inconsistent, // branch can never represent a local solution, so halted } pub(crate) struct Branch { index: BranchId, parent_index: BranchId, // Code execution state code_state: ComponentState, prepared_channel: Option<(Value, Value)>, sync_state: SpeculativeState, halted_at_port: PortIdLocal, // invalid if not halted next_branch_in_queue: Option, // Message/port state received: HashMap, // TODO: @temporary, remove together with fires() ports_delta: Vec, } impl Branch { /// Constructs a non-sync branch. It is assumed that the code is at the /// first instruction pub(crate) fn new_initial_branch(component_state: ComponentState) -> Self { Branch{ index: BranchId::new_invalid(), parent_index: BranchId::new_invalid(), code_state: component_state, prepared_channel: None, sync_state: SpeculativeState::RunningNonSync, halted_at_port: PortIdLocal::new_invalid(), next_branch_in_queue: None, received: HashMap::new(), ports_delta: Vec::new(), } } /// Constructs a sync branch. The provided branch is assumed to be the /// parent of the new branch within the execution tree. fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self { debug_assert!( (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) ); debug_assert!(parent_branch.prepared_channel.is_none()); Branch{ index: BranchId::new(new_index), parent_index: parent_branch.index, code_state: parent_branch.code_state.clone(), prepared_channel: None, sync_state: SpeculativeState::RunningInSync, halted_at_port: PortIdLocal::new_invalid(), next_branch_in_queue: None, received: parent_branch.received.clone(), ports_delta: parent_branch.ports_delta.clone(), } } fn commit_to_sync(&mut self) { // Logically impossible conditions (because we have a finished branch // we are going to commit to) debug_assert!(self.prepared_channel.is_none()); debug_assert!(!self.halted_at_port.is_valid()); // Reset other variables to their defaults self.index = BranchId::new_invalid(); self.parent_index = BranchId::new_invalid(); self.sync_state = SpeculativeState::RunningNonSync; self.next_branch_in_queue = None; self.received.clear(); self.ports_delta.clear(); } } #[derive(Clone)] struct PortAssignment { is_assigned: bool, last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet num_times_fired: u32, } impl PortAssignment { fn new_unassigned() -> Self { Self{ is_assigned: false, last_registered_branch_id: BranchId::new_invalid(), num_times_fired: 0, } } #[inline] fn mark_speculative(&mut self, num_times_fired: u32) { debug_assert!(!self.last_registered_branch_id.is_valid()); self.is_assigned = true; self.num_times_fired = num_times_fired; } #[inline] fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) { self.is_assigned = true; self.last_registered_branch_id = branch_id; self.num_times_fired = num_times_fired; } } #[derive(Clone)] struct PortOwnershipDelta { acquired: bool, // if false, then released ownership port_id: PortIdLocal, } #[derive(Debug)] enum PortOwnershipError { UsedInInteraction(PortIdLocal), AlreadyGivenAway(PortIdLocal) } /// Contains a description of the port mapping during a particular sync session. /// TODO: Extend documentation pub(crate) struct ConnectorPorts { // Essentially a mapping from `port_index` to `port_id`. pub owned_ports: Vec, // Contains P*B entries, where P is the number of ports and B is the number // of branches. One can find the appropriate mapping of port p at branch b // at linear index `b*P+p`. port_mapping: Vec } impl ConnectorPorts { /// Constructs the initial ports object. Assumes the presence of the /// non-sync branch at index 0. Will initialize all entries for the non-sync /// branch. fn new(owned_ports: Vec) -> Self { let num_ports = owned_ports.len(); let mut port_mapping = Vec::with_capacity(num_ports); for _ in 0..num_ports { port_mapping.push(PortAssignment::new_unassigned()); } Self{ owned_ports, port_mapping } } /// Prepares the port mapping for a new branch. Assumes that there is no /// intermediate branch index that we have skipped. fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) { let num_ports = self.owned_ports.len(); let parent_base_idx = parent_branch_idx as usize * num_ports; let new_base_idx = new_branch_idx as usize * num_ports; debug_assert!(parent_branch_idx < new_branch_idx); debug_assert!(new_base_idx == self.port_mapping.len()); self.port_mapping.reserve(num_ports); for offset in 0..num_ports { let parent_port = &self.port_mapping[parent_base_idx + offset]; let parent_port = parent_port.clone(); self.port_mapping.push(parent_port); } } /// Adds a new port. Caller must make sure that the connector is not in the /// sync phase. fn add_port(&mut self, port_id: PortIdLocal) { debug_assert!(self.port_mapping.len() == self.owned_ports.len()); debug_assert!(!self.owned_ports.contains(&port_id)); self.owned_ports.push(port_id); self.port_mapping.push(PortAssignment::new_unassigned()); } /// Commits to a particular branch. Essentially just removes the port /// mapping information generated during the sync phase. fn commit_to_sync(&mut self) { self.port_mapping.truncate(self.owned_ports.len()); debug_assert!(self.port_mapping.iter().all(|v| { !v.is_assigned && !v.last_registered_branch_id.is_valid() })); } /// Removes a particular port from the connector. May only be done if the /// connector is in non-sync mode fn remove_port(&mut self, port_id: PortIdLocal) { debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode let port_index = self.get_port_index(port_id).unwrap(); self.owned_ports.remove(port_index); self.port_mapping.remove(port_index); } /// Retrieves the index associated with a port id. Note that the port might /// not exist (yet) if a speculative branch has just received the port. /// TODO: But then again, one cannot use that port, right? #[inline] fn get_port_index(&self, port_id: PortIdLocal) -> Option { for (idx, port) in self.owned_ports.iter().enumerate() { if *port == port_id { return Some(idx) } } return None } /// Retrieves the ID associated with the port at the provided index #[inline] fn get_port_id(&self, port_index: usize) -> PortIdLocal { return self.owned_ports[port_index]; } #[inline] fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment { let mapped_idx = self.mapped_index(branch_idx, port_idx); return &self.port_mapping[mapped_idx]; } #[inline] fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment { let mapped_idx = self.mapped_index(branch_idx, port_idx); return &mut self.port_mapping[mapped_idx]; } #[inline] fn num_ports(&self) -> usize { return self.owned_ports.len(); } // Function for internal use: retrieve index in flattened port mapping array // based on branch/port index. #[inline] fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize { let branch_idx = branch_idx as usize; let num_ports = self.owned_ports.len(); debug_assert!(port_idx < num_ports); debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len()); return branch_idx * num_ports + port_idx; } } struct BranchQueue { first: u32, last: u32, } impl BranchQueue { #[inline] fn new() -> Self { Self{ first: 0, last: 0 } } #[inline] fn is_empty(&self) -> bool { debug_assert!((self.first == 0) == (self.last == 0)); return self.first == 0; } #[inline] fn clear(&mut self) { self.first = 0; self.last = 0; } } /// Public fields of the connector that can be freely shared between multiple /// threads. pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, pub sleeping: AtomicBool, } impl ConnectorPublic { pub fn new(initialize_as_sleeping: bool) -> Self { ConnectorPublic{ inbox: PublicInbox::new(), sleeping: AtomicBool::new(initialize_as_sleeping), } } } // TODO: Maybe prevent false sharing by aligning `public` to next cache line. // TODO: Do this outside of the connector, create a wrapping struct pub(crate) struct ConnectorPDL { // State and properties of connector itself in_sync: bool, // Branch management branches: Vec, // first branch is always non-speculative one sync_active: BranchQueue, sync_pending_get: BranchQueue, sync_finished: BranchQueue, sync_finished_last_handled: u32, // TODO: Change to BranchId? cur_round: u32, // Port/message management pub committed_to: Option<(ConnectorId, u64)>, pub inbox: PrivateInbox, pub ports: ConnectorPorts, } // TODO: Remove this monstrosity struct ConnectorRunContext<'a> { branch_index: u32, ports: &'a ConnectorPorts, ports_delta: &'a Vec, received: &'a HashMap, scheduler: SchedulerCtx<'a>, prepared_channel: Option<(Value, Value)>, } impl<'a> RunContext for ConnectorRunContext<'a> { fn did_put(&mut self, port: PortId) -> bool { if self.ports_delta.iter().any(|v| v.port_id.index == port.0.u32_suffix) { // Either acquired or released, must be silent return false; } let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap(); let mapping = self.ports.get_port(self.branch_index, port_index); return mapping.is_assigned; } fn get(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); match self.received.get(&port_id) { Some(message) => Some(message.message.clone()), None => None, } } fn fires(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); if self.ports_delta.iter().any(|v| v.port_id == port_id) { return None } let port_index = self.ports.get_port_index(port_id).unwrap(); let mapping = self.ports.get_port(self.branch_index, port_index); if mapping.is_assigned { return Some(Value::Bool(mapping.num_times_fired != 0)); } else { return None; } } fn get_channel(&mut self) -> Option<(Value, Value)> { return self.prepared_channel.take(); } } impl Connector for ConnectorPDL { fn handle_message(&mut self, message: Message, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { use MessageContents as MC; match message.contents { MC::Data(content) => self.handle_data_message(message.receiving_port, content), MC::Sync(content) => self.handle_sync_message(content, ctx, delta_state), MC::RequestCommit(content) => self.handle_request_commit_message(content, ctx, delta_state), MC::ConfirmCommit(content) => self.handle_confirm_commit_message(content, ctx, delta_state), MC::Control(_) | MC::Ping => {}, } } fn run(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) -> ConnectorScheduling { if self.in_sync { // Check for new messages we haven't seen before. If any of the // pending branches can accept the message, do so. while let Some((target_port_id, message)) = self.inbox.next_message() { let mut branch_idx = self.sync_pending_get.first; while branch_idx != 0 { let branch = &self.branches[branch_idx as usize]; let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0); let target_port_index = self.ports.get_port_index(*target_port_id).unwrap(); let port_mapping = self.ports.get_port(branch_idx, target_port_index); if branch.sync_state == SpeculativeState::HaltedAtBranchPoint && branch.halted_at_port == *target_port_id && port_mapping.last_registered_branch_id == message.sender_prev_branch_id { // Branch may accept this mesage, so create a fork that // contains this message in the inbox. let new_branch_idx = self.branches.len() as u32; let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch); self.ports.prepare_sync_branch(branch_idx, new_branch_idx); let mapping = self.ports.get_port_mut(branch_idx, target_port_index); mapping.last_registered_branch_id = message.sender_cur_branch_id; let new_branch_id = BranchId::new(new_branch_idx); self.branches.push(new_branch); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id) } branch_idx = next_branch_idx; } } let scheduling = self.run_in_speculative_mode(sched_ctx, conn_ctx, delta_state); // When in speculative mode we might have generated new sync // solutions, we need to turn them into proposed solutions here. if self.sync_finished_last_handled != self.sync_finished.last { // Retrieve first element in queue let mut next_id; if self.sync_finished_last_handled == 0 { next_id = self.sync_finished.first; } else { let last_handled = &self.branches[self.sync_finished_last_handled as usize]; debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" next_id = last_handled.next_branch_in_queue.unwrap(); } loop { let branch_id = BranchId::new(next_id); let branch = &self.branches[next_id as usize]; let branch_next = branch.next_branch_in_queue; // Turn local solution into a message and send it along // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed let solution_message = self.generate_initial_solution_for_branch(branch_id, conn_ctx); if let Some(valid_solution) = solution_message { self.submit_sync_solution(valid_solution, conn_ctx, delta_state); } else { // Branch is actually invalid, but we only just figured // it out. We need to mark it as invalid to prevent // future use Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); if branch_id.index == self.sync_finished_last_handled { self.sync_finished_last_handled = self.sync_finished.last; } let branch = &mut self.branches[next_id as usize]; branch.sync_state = SpeculativeState::Inconsistent; } match branch_next { Some(id) => next_id = id, None => break, } } self.sync_finished_last_handled = next_id; } return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, conn_ctx, delta_state); return scheduling; } } } impl ConnectorPDL { /// Constructs a representation of a connector. The assumption is that the /// initial branch is at the first instruction of the connector's code, /// hence is in a non-sync state. pub fn new(initial_branch: Branch, owned_ports: Vec) -> Self { Self{ in_sync: false, branches: vec![initial_branch], sync_active: BranchQueue::new(), sync_pending_get: BranchQueue::new(), sync_finished: BranchQueue::new(), sync_finished_last_handled: 0, // none at all cur_round: 0, committed_to: None, inbox: PrivateInbox::new(), ports: ConnectorPorts::new(owned_ports), } } pub fn is_in_sync_mode(&self) -> bool { return self.in_sync; } // ------------------------------------------------------------------------- // Handling connector messages // ------------------------------------------------------------------------- pub fn handle_data_message(&mut self, target_port: PortIdLocal, message: DataMessage) { self.inbox.insert_message(target_port, message); } /// Accepts a synchronous message and combines it with the locally stored /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate. pub fn handle_sync_message(&mut self, message: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { debug_assert!(!message.to_visit.contains(&ctx.id)); // own ID already removed debug_assert!(message.constraints.iter().any(|v| v.connector_id == ctx.id)); // we have constraints // TODO: Optimize, use some kind of temp workspace vector let mut execution_path_branch_ids = Vec::new(); if self.sync_finished_last_handled != 0 { // We have some solutions to match against let constraints_index = message.constraints .iter() .position(|v| v.connector_id == ctx.id) .unwrap(); let constraints = &message.constraints[constraints_index].constraints; debug_assert!(!constraints.is_empty()); // Note that we only iterate over the solutions we've already // handled ourselves, not necessarily let mut branch_index = self.sync_finished.first; 'branch_loop: loop { // Load solution branch let branch = &self.branches[branch_index as usize]; execution_path_branch_ids.clear(); self.branch_ids_of_execution_path(BranchId::new(branch_index), &mut execution_path_branch_ids); // Check if the branch matches all of the applied constraints for constraint in constraints { match constraint { SyncBranchConstraint::SilentPort(silent_port_id) => { let port_index = self.ports.get_port_index(*silent_port_id); if port_index.is_none() { // Nefarious peer continue 'branch_loop; } let port_index = port_index.unwrap(); let mapping = self.ports.get_port(branch_index, port_index); debug_assert!(mapping.is_assigned); if mapping.num_times_fired != 0 { // Not silent, constraint not satisfied continue 'branch_loop; } }, SyncBranchConstraint::BranchNumber(expected_branch_id) => { if !execution_path_branch_ids.contains(expected_branch_id) { // Not the expected execution path, constraint not satisfied continue 'branch_loop; } }, SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { let port_index = self.ports.get_port_index(*port_id); if port_index.is_none() { // Nefarious peer continue 'branch_loop; } let port_index = port_index.unwrap(); let mapping = self.ports.get_port(branch_index, port_index); if mapping.last_registered_branch_id != *expected_branch_id { // Not the expected interaction on this port, constraint not satisfied continue 'branch_loop; } }, } } // If here, then all of the external constraints were satisfied // for the current branch. But the branch itself also imposes // constraints. So while building up the new solution, make sure // that those are satisfied as well. // TODO: Code below can probably be merged with initial solution // generation. // - clone old solution so we can add to it let mut new_solution = message.clone(); // - determine the initial port mapping let num_ports = self.ports.num_ports(); let mut new_solution_mapping = Vec::with_capacity(num_ports); for port_index in 0..self.ports.num_ports() { let port_id = self.ports.get_port_id(port_index); let mapping = self.ports.get_port(branch_index, port_index); new_solution_mapping.push((port_id, mapping.last_registered_branch_id)); } // - replace constraints with a local solution new_solution.constraints.remove(constraints_index); new_solution.local_solutions.push(SyncConnectorSolution{ connector_id: ctx.id, terminating_branch_id: BranchId::new(branch_index), execution_branch_ids: execution_path_branch_ids.clone(), final_port_mapping: new_solution_mapping, }); // - do a second pass on the ports to generate and add the // constraints that should be applied to other connectors for port_index in 0..self.ports.num_ports() { let port_id = self.ports.get_port_id(port_index); let (peer_connector_id, peer_port_id, peer_is_getter) = { let port = ctx.get_port(port_id); (port.peer_connector, port.peer_id, port.kind == PortKind::Putter) }; let mapping = self.ports.get_port(branch_index, port_index); let constraint = if mapping.num_times_fired == 0 { SyncBranchConstraint::SilentPort(peer_port_id) } else { if peer_is_getter { SyncBranchConstraint::PortMapping(peer_port_id, mapping.last_registered_branch_id) } else { SyncBranchConstraint::BranchNumber(mapping.last_registered_branch_id) } }; match new_solution.add_or_check_constraint(peer_connector_id, constraint) { Err(_) => continue 'branch_loop, Ok(false) => continue 'branch_loop, Ok(true) => {}, } } // If here, then the newly generated solution is completely // compatible. let next_branch = branch.next_branch_in_queue; self.submit_sync_solution(new_solution, ctx, results); // Consider the next branch if branch_index == self.sync_finished_last_handled { // At the end of the previously handled solutions break; } debug_assert!(next_branch.is_some()); // because we cannot be at the end of the queue branch_index = next_branch.unwrap(); } } } fn handle_request_commit_message(&mut self, mut message: SolutionMessage, ctx: &ConnectorCtx, delta_state: &mut RunDeltaState) { let should_propagate_message = match &self.committed_to { Some((previous_origin, previous_comparison)) => { // Already committed to something. So will commit to this if it // takes precedence over the current solution message.comparison_number > *previous_comparison || (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_origin.0) }, None => { // Not yet committed to a solution, so commit to this one true } }; if should_propagate_message { self.committed_to = Some((message.connector_origin, message.comparison_number)); if message.to_visit.is_empty() { // Visited all of the connectors, so every connector can now // apply the solution // TODO: Use temporary workspace let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1); for (connector_id, _) in &message.local_solutions { if *connector_id != ctx.id { to_visit.push(*connector_id); } } message.to_visit = to_visit; self.handle_confirm_commit_message(message.clone(), ctx, delta_state); delta_state.outbox.push(MessageContents::ConfirmCommit(message)); } else { // Not yet visited all of the connectors delta_state.outbox.push(MessageContents::RequestCommit(message)); } } } fn handle_confirm_commit_message(&mut self, message: SolutionMessage, ctx: &ConnectorCtx, _delta_state: &mut RunDeltaState) { // Make sure this is the message we actually committed to. As long as // we're running on a single machine this is fine. // TODO: Take care of nefarious peers let (expected_connector_id, expected_comparison_number) = self.committed_to.unwrap(); assert_eq!(message.connector_origin, expected_connector_id); assert_eq!(message.comparison_number, expected_comparison_number); // Find the branch we're supposed to commit to let (_, branch_id) = message.local_solutions .iter() .find(|(id, _)| *id == ctx.id) .unwrap(); let branch_id = *branch_id; // Commit to the branch. That is: move the solution branch to the first // of the connector's branches self.in_sync = false; self.branches.swap(0, branch_id.index as usize); self.branches.truncate(1); // TODO: Or drain and do not deallocate? let solution = &mut self.branches[0]; // Clear all of the other sync-related variables self.sync_active.clear(); self.sync_pending_get.clear(); self.sync_finished.clear(); self.sync_finished_last_handled = 0; self.cur_round += 1; self.committed_to = None; self.inbox.clear(); self.ports.commit_to_sync(); // Add/remove any of the ports we lost during the sync phase for port_delta in &solution.ports_delta { if port_delta.acquired { self.ports.add_port(port_delta.port_id); } else { self.ports.remove_port(port_delta.port_id); } } solution.commit_to_sync(); } // ------------------------------------------------------------------------- // Executing connector code // ------------------------------------------------------------------------- /// Runs the connector in synchronous mode. Potential changes to the global /// system's state are added to the `RunDeltaState` object by the connector, /// where it is the caller's responsibility to immediately take care of /// those changes. The return value indicates when (and if) the connector /// needs to be scheduled again. pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, _context: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(self.in_sync); if self.sync_active.is_empty() { return ConnectorScheduling::NotNow; } let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active); // Run the branch to the next blocking point debug_assert!(branch.prepared_channel.is_none()); let mut run_context = ConnectorRunContext { branch_index: branch.index.index, ports: &self.ports, ports_delta: &branch.ports_delta, scheduler: sched_ctx, prepared_channel: None, received: &branch.received, }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); // Match statement contains `return` statements only if the particular // run result behind handled requires an immediate re-run of the // connector. match run_result { RunResult::BranchInconsistent => { // Speculative branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, RunResult::BranchMissingPortState(port_id) => { // Branch called `fires()` on a port that does not yet have an // assigned speculative value. So we need to create those // branches let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); debug_assert!(self.ports.owned_ports.contains(&local_port_id)); // Create two copied branches, one silent and one firing branch.sync_state = SpeculativeState::HaltedAtBranchPoint; let parent_branch_id = branch.index; let parent_branch = &self.branches[parent_branch_id.index as usize]; let silent_index = self.branches.len() as u32; let firing_index = silent_index + 1; let silent_branch = Branch::new_sync_branching_from(silent_index, parent_branch); self.ports.prepare_sync_branch(parent_branch.index.index, silent_index); let firing_branch = Branch::new_sync_branching_from(firing_index, parent_branch); self.ports.prepare_sync_branch(parent_branch.index.index, firing_index); // Assign the port values of the two new branches let silent_port = self.ports.get_port_mut(silent_index, local_port_index); silent_port.mark_speculative(0); let firing_port = self.ports.get_port_mut(firing_index, local_port_index); firing_port.mark_speculative(1); // Run both branches again let silent_branch_id = silent_branch.index; self.branches.push(silent_branch); let firing_branch_id = firing_branch.index; self.branches.push(firing_branch); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch_id); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch_id); return ConnectorScheduling::Immediate; }, RunResult::BranchMissingPortValue(port_id) => { // Branch performed a `get` on a port that has not yet received // a value in its inbox. let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); let local_port_index = self.ports.get_port_index(local_port_id); if local_port_index.is_none() { todo!("deal with the case where the port is acquired"); } let local_port_index = local_port_index.unwrap(); let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); // Check for port mapping assignment and, if present, if it is // consistent let is_valid_get = if port_mapping.is_assigned { assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires` port_mapping.num_times_fired == 1 } else { // Not yet assigned port_mapping.mark_speculative(1); true }; if is_valid_get { // Mark as a branching point for future messages branch.sync_state = SpeculativeState::HaltedAtBranchPoint; branch.halted_at_port = local_port_id; let branch_id = branch.index; Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id); // But if some messages can be immediately applied, do so // now. let messages = self.inbox.get_messages(local_port_id, port_mapping.last_registered_branch_id); let mut did_have_messages = false; for message in messages { did_have_messages = true; // For each message prepare a new branch to execute let parent_branch = &self.branches[branch_id.index as usize]; let new_branch_index = self.branches.len() as u32; let mut new_branch = Branch::new_sync_branching_from(new_branch_index, parent_branch); self.ports.prepare_sync_branch(branch_id.index, new_branch_index); let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); port_mapping.last_registered_branch_id = message.sender_cur_branch_id; debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1); new_branch.received.insert(local_port_id, message.clone()); // If the message contains any ports then they will now // be owned by the new branch debug_assert!(results.ports.is_empty()); find_ports_in_value_group(&message.message, &mut results.ports); Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &results.ports); results.ports.clear(); // Schedule the new branch debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); let new_branch_id = new_branch.index; self.branches.push(new_branch); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id); } if did_have_messages { // If we did create any new branches, then we can run // them immediately. return ConnectorScheduling::Immediate; } } else { branch.sync_state = SpeculativeState::Inconsistent; } }, RunResult::BranchAtSyncEnd => { // Branch is done, go through all of the ports that are not yet // assigned and map them to non-firing. for port_idx in 0..self.ports.num_ports() { let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx); if !port_mapping.is_assigned { port_mapping.mark_speculative(0); } } let branch_id = branch.index; branch.sync_state = SpeculativeState::ReachedSyncEnd; Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch_id); }, RunResult::BranchPut(port_id, value_group) => { // Branch performed a `put` on a particualar port. let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix }; let local_port_index = self.ports.get_port_index(local_port_id); if local_port_index.is_none() { todo!("handle case where port was received before (i.e. in ports_delta)") } let local_port_index = local_port_index.unwrap(); // Check the port mapping for consistency // TODO: For now we can only put once, so that simplifies stuff let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); let is_valid_put = if port_mapping.is_assigned { // Already assigned, so must be speculative and one time // firing, otherwise we are `put`ing multiple times. if port_mapping.last_registered_branch_id.is_valid() { // Already did a `put` todo!("handle error through RunDeltaState"); } else { // Valid if speculatively firing port_mapping.num_times_fired == 1 } } else { // Not yet assigned, do so now true }; if is_valid_put { // Put in run results for thread to pick up and transfer to // the correct connector inbox. port_mapping.mark_definitive(branch.index, 1); let message = DataMessage{ sending_port: local_port_id, sender_prev_branch_id: BranchId::new_invalid(), sender_cur_branch_id: branch.index, message: value_group, }; // If the message contains any ports then we release our // ownership over them in this branch debug_assert!(results.ports.is_empty()); find_ports_in_value_group(&message.message, &mut results.ports); Self::release_ports_during_sync(&mut self.ports, branch, &results.ports).unwrap(); results.ports.clear(); results.outbox.push(MessageContents::Data(message)); let branch_index = branch.index; Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index); return ConnectorScheduling::Immediate } else { branch.sync_state = SpeculativeState::Inconsistent; } }, _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result), } // Not immediately scheduling, so schedule again if there are more // branches to run if self.sync_active.is_empty() { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; } } /// Runs the connector in non-synchronous mode. pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, conn_ctx: &ConnectorCtx, results: &mut RunDeltaState) -> ConnectorScheduling { debug_assert!(!self.in_sync); debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); debug_assert!(self.branches.len() == 1); let branch = &mut self.branches[0]; debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ branch_index: branch.index.index, ports: &self.ports, ports_delta: &branch.ports_delta, scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), received: &branch.received, }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); match run_result { RunResult::ComponentTerminated => { // Need to wait until all children are terminated // TODO: Think about how to do this? branch.sync_state = SpeculativeState::Finished; return ConnectorScheduling::Exit; }, RunResult::ComponentAtSyncStart => { // Prepare for sync execution and reschedule immediately self.in_sync = true; let first_sync_branch = Branch::new_sync_branching_from(1, branch); let first_sync_branch_id = first_sync_branch.index; self.ports.prepare_sync_branch(0, 1); self.branches.push(first_sync_branch); Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id); return ConnectorScheduling::Later; }, RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Construction of a new component. Find all references to ports // inside of the arguments debug_assert!(results.ports.is_empty()); find_ports_in_value_group(&arguments, &mut results.ports); if !results.ports.is_empty() { // Ports changing ownership if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &results.ports) { todo!("fatal error handling"); } } // Add connector for later execution let new_connector_state = ComponentState { prompt: Prompt::new( &sched_ctx.runtime.protocol_description.types, &sched_ctx.runtime.protocol_description.heap, definition_id, monomorph_idx, arguments ) }; let new_connector_ports = results.ports.clone(); // TODO: Do something with this let new_connector_branch = Branch::new_initial_branch(new_connector_state); let new_connector = ConnectorPDL::new(new_connector_branch, new_connector_ports); results.new_connectors.push(new_connector); return ConnectorScheduling::Later; }, RunResult::NewChannel => { // Need to prepare a new channel let (getter, putter) = sched_ctx.runtime.create_channel(conn_ctx.id); debug_assert_eq!(getter.kind, PortKind::Getter); branch.prepared_channel = Some(( Value::Input(PortId::new(putter.self_id.index)), Value::Output(PortId::new(getter.self_id.index)) )); results.new_ports.push(putter); results.new_ports.push(getter); return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), } } // ------------------------------------------------------------------------- // Internal helpers // ------------------------------------------------------------------------- // Helpers for management of the branches and their internally stored // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming // linked lists inside of the vector of branches. /// Pops from front of linked-list branch queue. fn pop_branch_from_queue<'a>(branches: &'a mut Vec, queue: &mut BranchQueue) -> &'a mut Branch { debug_assert!(queue.first != 0); let branch = &mut branches[queue.first as usize]; queue.first = branch.next_branch_in_queue.unwrap_or(0); branch.next_branch_in_queue = None; if queue.first == 0 { // No more entries in queue debug_assert_eq!(queue.last, branch.index.index); queue.last = 0; } return branch; } /// Pushes branch at the end of the linked-list branch queue. fn push_branch_into_queue( branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId, ) { debug_assert!(to_push.is_valid()); let to_push = to_push.index; if queue.last == 0 { // No branches in the queue at all debug_assert_eq!(queue.first, 0); branches[to_push as usize].next_branch_in_queue = None; queue.first = to_push; queue.last = to_push; } else { // Pre-existing branch in the queue debug_assert_ne!(queue.first, 0); branches[queue.last as usize].next_branch_in_queue = Some(to_push); queue.last = to_push; } } /// Removes branch from linked-list queue. Walks through the entire list to /// find the element (!). Assumption is that this is not called often. fn remove_branch_from_queue( branches: &mut Vec, queue: &mut BranchQueue, to_delete: BranchId, ) { debug_assert!(to_delete.is_valid()); // we're deleting a valid item debug_assert!(queue.first != 0 && queue.last != 0); // queue isn't empty to begin with // Retrieve branch and its next element let branch_to_delete = &mut branches[to_delete.index as usize]; let branch_next_index_option = branch_to_delete.next_branch_in_queue; let branch_next_index_unwrapped = branch_next_index_option.unwrap_or(0); branch_to_delete.next_branch_in_queue = None; // Walk through all elements in queue to find branch to delete let mut prev_index = 0; let mut next_index = queue.first; while next_index != 0 { if next_index == to_delete.index { // Found the element we're going to delete // - check if at the first element or not if prev_index == 0 { queue.first = branch_next_index_unwrapped; } else { let prev_branch = &mut branches[prev_index as usize]; prev_branch.next_branch_in_queue = branch_next_index_option; } // - check if at last element or not (also takes care of "no elements left in queue") if branch_next_index_option.is_none() { queue.last = prev_index; } return; } prev_index = next_index; let entry = &branches[next_index as usize]; next_index = entry.next_branch_in_queue.unwrap_or(0); } // If here, then we didn't find the element panic!("branch does not exist in provided queue"); } // Helpers for local port management. Specifically for adopting/losing // ownership over ports, and for checking if specific ports can be sent // over another port. /// Releasing ownership of ports while in non-sync mode. This only occurs /// while instantiating new connectors fn release_ports_during_non_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { debug_assert!(!branch.index.is_valid()); // branch in non-sync mode for port_id in port_ids { // We must own the port, or something is wrong with our code todo!("Set up some kind of message router"); debug_assert!(ports.get_port_index(*port_id).is_some()); ports.remove_port(*port_id); } return Ok(()) } /// Releasing ownership of ports during a sync-session. Will provide an /// error if the port was already used during a sync block. fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { if port_ids.is_empty() { return Ok(()) } todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode for port_id in port_ids { match ports.get_port_index(*port_id) { Some(port_index) => { // We (used to) own the port. Make sure it is not given away // already and not used to put/get data. let port_mapping = ports.get_port(branch.index.index, port_index); if port_mapping.is_assigned && port_mapping.num_times_fired != 0 { // Already used return Err(PortOwnershipError::UsedInInteraction(*port_id)); } for delta in &branch.ports_delta { if delta.port_id == *port_id { // We cannot have acquired this port, because the // call to `ports.get_port_index` returned an index. debug_assert!(!delta.acquired); return Err(PortOwnershipError::AlreadyGivenAway(*port_id)); } } branch.ports_delta.push(PortOwnershipDelta{ acquired: false, port_id: *port_id, }); }, None => { // Not in port mapping, so we must have acquired it before, // remove the acquirement. let mut to_delete_index: isize = -1; for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { if delta.port_id == *port_id { debug_assert!(delta.acquired); to_delete_index = delta_idx as isize; break; } } debug_assert!(to_delete_index != -1); branch.ports_delta.remove(to_delete_index as usize); } } } return Ok(()) } /// Acquiring ports during a sync-session. fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { if port_ids.is_empty() { return Ok(()) } todo!("unfinished: add port properties during final solution-commit msgs"); debug_assert!(branch.index.is_valid()); // branch in sync mode 'port_loop: for port_id in port_ids { for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { if delta.port_id == *port_id { if delta.acquired { // Somehow already received this port. // TODO: @security todo!("take care of nefarious peers"); } else { // Sending ports to ourselves debug_assert!(ports.get_port_index(*port_id).is_some()); branch.ports_delta.remove(delta_idx); continue 'port_loop; } } } // If here then we can safely acquire the new port branch.ports_delta.push(PortOwnershipDelta{ acquired: true, port_id: *port_id, }); } return Ok(()) } // Helpers for generating and handling sync messages (and the solutions that // are described by those sync messages) /// Generates the initial solution for a finished sync branch. If initial /// local solution is valid, then the appropriate message is returned. /// Otherwise the initial solution is inconsistent. fn generate_initial_solution_for_branch(&self, branch_id: BranchId, ctx: &ConnectorCtx) -> Option { // Retrieve branchg debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode let branch = &self.branches[branch_id.index as usize]; debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd); // Set up storage (this is also the storage for all of the connectors // that will be visited, hence the initial size approximation) let mut all_branch_ids = Vec::new(); self.branch_ids_of_execution_path(branch_id, &mut all_branch_ids); let num_ports = self.ports.num_ports(); let approximate_peers = num_ports; let mut initial_solution_port_mapping = Vec::with_capacity(num_ports); for port_idx in 0..self.ports.num_ports() { let port_id = self.ports.get_port_id(port_idx); let port_desc = self.ports.get_port(branch_id.index, port_idx); // Note: if assigned then we expect a valid branch ID. Otherwise we have the "invalid // branch" as ID, marking that we want it to be silent debug_assert!(port_desc.is_assigned == port_desc.last_registered_branch_id.is_valid()); initial_solution_port_mapping.push((port_id, port_desc.last_registered_branch_id)); } let initial_local_solution = SyncConnectorSolution{ connector_id: ctx.id, terminating_branch_id: branch_id, execution_branch_ids: all_branch_ids, final_port_mapping: initial_solution_port_mapping, }; let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers); // Turn local port mapping into constraints on other connectors // - constraints on other components due to transferred ports for port_delta in &branch.ports_delta { // For transferred ports we always have two constraints: one for the // sender and one for the receiver, ensuring it was not used. // TODO: This will fail if a port is passed around multiple times. // maybe a special "passed along" entry in `ports_delta`. if !sync_message.check_constraint(ctx.id, SyncBranchConstraint::SilentPort(port_delta.port_id)).unwrap() { return None; } // Might need to check if we own the other side of the channel let port = ctx.get_port(port_delta.port_id); if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() { return None; } } // - constraints on other components due to owned ports for port_index in 0..self.ports.num_ports() { let port_id = self.ports.get_port_id(port_index); let port_mapping = self.ports.get_port(branch_id.index, port_index); let port = ctx.get_port(port_id); let constraint = if port_mapping.is_assigned { if port.kind == PortKind::Getter { SyncBranchConstraint::BranchNumber(port_mapping.last_registered_branch_id) } else { SyncBranchConstraint::PortMapping(port.peer_id, port_mapping.last_registered_branch_id) } } else { SyncBranchConstraint::SilentPort(port.peer_id) }; if !sync_message.add_or_check_constraint(port.peer_connector, constraint).unwrap() { return None; } } return Some(sync_message); } fn submit_sync_solution(&mut self, partial_solution: SyncMessage, ctx: &ConnectorCtx, results: &mut RunDeltaState) { if partial_solution.to_visit.is_empty() { // Solution is completely consistent. So ask everyone to commit // TODO: Maybe another package for random? let comparison_number: u64 = unsafe { let mut random_array = [0u8; 8]; getrandom::getrandom(&mut random_array).unwrap(); std::mem::transmute(random_array) }; let num_local = partial_solution.local_solutions.len(); let mut full_solution = SolutionMessage{ comparison_number, connector_origin: ctx.id, local_solutions: Vec::with_capacity(num_local), to_visit: Vec::with_capacity(num_local - 1), }; for local_solution in &partial_solution.local_solutions { full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id)); if local_solution.connector_id != ctx.id { full_solution.to_visit.push(local_solution.connector_id); } } debug_assert!(self.committed_to.is_none()); self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number)); results.outbox.push(MessageContents::RequestCommit(full_solution)); } else { // Still have connectors to visit results.outbox.push(MessageContents::Sync(partial_solution)); } } fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec) { debug_assert!(parents.is_empty()); let mut next_branch_id = leaf_branch_id; debug_assert!(next_branch_id.is_valid()); while next_branch_id.is_valid() { parents.push(next_branch_id); let branch = &self.branches[next_branch_id.index as usize]; next_branch_id = branch.parent_index; } } } /// A data structure passed to a connector whose code is being executed that is /// used to queue up various state changes that have to be applied after /// running, e.g. the messages the have to be transferred to other connectors. // TODO: Come up with a better name pub(crate) struct RunDeltaState { // Variables that allow the thread running the connector to pick up global // state changes and try to apply them. pub outbox: Vec, pub new_connectors: Vec, pub new_ports: Vec, // Workspaces pub ports: Vec, } impl RunDeltaState { /// Constructs a new `RunDeltaState` object with the default amount of /// reserved memory pub fn new() -> Self { RunDeltaState{ outbox: Vec::with_capacity(64), new_connectors: Vec::new(), new_ports: Vec::new(), ports: Vec::with_capacity(64), } } } #[derive(Eq, PartialEq)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time NotNow, // Do not reschedule for running Exit, // Connector has exited } /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port let cur_port = PortIdLocal::new(port_id.0.u32_suffix); for prev_port in ports.iter() { if *prev_port == cur_port { // Already added return; } } ports.push(cur_port); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } }