diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index e72928313e678585399b6e2ae593b7d25a7d14dd..f60d038c8d9816830c301122a06d6ef70c5896dd 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,315 +1,46 @@ +// connector.rs +// +// Represents a component. A component (and the scheduler that is running it) +// has many properties that are not easy to subdivide into aspects that are +// conceptually handled by particular data structures. That is to say: the code +// that we run governs: running PDL code, keeping track of ports, instantiating +// new components and transports (i.e. interacting with the runtime), running +// a consensus algorithm, etc. But on the other hand, our data is rather +// simple: we have a speculative execution tree, a set of ports that we own, +// and a bit of code that we should run. +// +// So currently the code is organized as following: +// - The scheduler that is running the component is the authoritative source on +// ports during *non-sync* mode. The consensus algorithm is the +// authoritative source during *sync* mode. They retrieve each other's +// state during the transitions. Hence port data exists duplicated between +// these two datastructures. +// - The execution tree is where executed branches reside. But the execution +// tree is only aware of the tree shape itself (and keeps track of some +// queues of branches that are in a particular state), and tends to store +// the PDL program state. The consensus algorithm is also somewhat aware +// of the execution tree, but only in terms of what is needed to complete +// a sync round (for now, that means the port mapping in each branch). +// Hence once more we have properties conceptually associated with branches +// in two places. +// - TODO: Write about handling messages, consensus wrapping data +// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx + use std::collections::HashMap; use std::sync::atomic::AtomicBool; use crate::PortId; -use crate::protocol::{ComponentState, RunContext, RunResult}; +use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; +use crate::protocol::{RunContext, RunResult}; -use super::ConnectorId; +use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; +use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; +use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox}; use super::native::Connector; -use super::scheduler::{ - SchedulerCtx, ComponentCtxFancy, ComponentPortChange, - ReceivedMessage -}; -use super::inbox::{ - PublicInbox, - DataMessage, SyncMessage, SolutionMessage, MessageContents, - SyncBranchConstraint, SyncConnectorSolution -}; use super::port::{PortKind, PortIdLocal}; +use super::scheduler::{ComponentCtx, SchedulerCtx}; -/// Represents the identifier of a branch (the index within its container). An -/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not -/// yet receive anything from any branch). -// TODO: Remove Debug derive -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct BranchId { - pub index: u32, -} - -impl BranchId { - fn new_invalid() -> Self { - Self{ index: 0 } - } - - fn new(index: u32) -> Self { - debug_assert!(index != 0); - Self{ index } - } - - #[inline] - pub(crate) fn is_valid(&self) -> bool { - return self.index != 0; - } -} - -#[derive(Debug, PartialEq, Eq)] -pub(crate) enum SpeculativeState { - // Non-synchronous variants - RunningNonSync, // regular execution of code - Error, // encountered a runtime error - Finished, // finished executing connector's code - // Synchronous variants - RunningInSync, // running within a sync block - HaltedAtBranchPoint, // at a branching point (at a `get` call) - ReachedSyncEnd, // reached end of sync block, branch represents a local solution - Inconsistent, // branch can never represent a local solution, so halted -} - -pub(crate) struct Branch { - index: BranchId, - parent_index: BranchId, - // Code execution state - code_state: ComponentState, - prepared_channel: Option<(Value, Value)>, - sync_state: SpeculativeState, - halted_at_port: PortIdLocal, // invalid if not halted - next_branch_in_queue: Option, - // Message/port state - received: HashMap, // TODO: @temporary, remove together with fires() - ports_delta: Vec, -} - -impl Branch { - /// Constructs a non-sync branch. It is assumed that the code is at the - /// first instruction - pub(crate) fn new_initial_branch(component_state: ComponentState) -> Self { - Branch{ - index: BranchId::new_invalid(), - parent_index: BranchId::new_invalid(), - code_state: component_state, - prepared_channel: None, - sync_state: SpeculativeState::RunningNonSync, - halted_at_port: PortIdLocal::new_invalid(), - next_branch_in_queue: None, - received: HashMap::new(), - ports_delta: Vec::new(), - } - } - - /// Constructs a sync branch. The provided branch is assumed to be the - /// parent of the new branch within the execution tree. - fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self { - debug_assert!( - (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || - (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) - ); - debug_assert!(parent_branch.prepared_channel.is_none()); - - Branch{ - index: BranchId::new(new_index), - parent_index: parent_branch.index, - code_state: parent_branch.code_state.clone(), - prepared_channel: None, - sync_state: SpeculativeState::RunningInSync, - halted_at_port: PortIdLocal::new_invalid(), - next_branch_in_queue: None, - received: parent_branch.received.clone(), - ports_delta: parent_branch.ports_delta.clone(), - } - } - - fn commit_to_sync(&mut self) { - // Logically impossible conditions (because we have a finished branch - // we are going to commit to) - debug_assert!(self.prepared_channel.is_none()); - debug_assert!(!self.halted_at_port.is_valid()); - - // Reset other variables to their defaults - self.index = BranchId::new_invalid(); - self.parent_index = BranchId::new_invalid(); - self.sync_state = SpeculativeState::RunningNonSync; - self.next_branch_in_queue = None; - self.received.clear(); - self.ports_delta.clear(); - } -} - -#[derive(Clone)] -struct PortAssignment { - is_assigned: bool, - last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet - num_times_fired: u32, -} - -impl PortAssignment { - fn new_unassigned() -> Self { - Self{ - is_assigned: false, - last_registered_branch_id: BranchId::new_invalid(), - num_times_fired: 0, - } - } - - #[inline] - fn mark_speculative(&mut self, num_times_fired: u32) { - debug_assert!(!self.last_registered_branch_id.is_valid()); - self.is_assigned = true; - self.num_times_fired = num_times_fired; - } - - #[inline] - fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) { - self.is_assigned = true; - self.last_registered_branch_id = branch_id; - self.num_times_fired = num_times_fired; - } -} - -#[derive(Debug)] -enum PortOwnershipError { - UsedInInteraction(PortIdLocal), - AlreadyGivenAway(PortIdLocal) -} - -/// Contains a description of the port mapping during a particular sync session. -/// TODO: Extend documentation -pub(crate) struct ConnectorPorts { - // Essentially a mapping from `port_index` to `port_id`. - pub owned_ports: Vec, - // Contains P*B entries, where P is the number of ports and B is the number - // of branches. One can find the appropriate mapping of port p at branch b - // at linear index `b*P+p`. - port_mapping: Vec -} - -impl ConnectorPorts { - /// Constructs the initial ports object. Assumes the presence of the - /// non-sync branch at index 0. Will initialize all entries for the non-sync - /// branch. - fn new(owned_ports: Vec) -> Self { - let num_ports = owned_ports.len(); - let mut port_mapping = Vec::with_capacity(num_ports); - for _ in 0..num_ports { - port_mapping.push(PortAssignment::new_unassigned()); - } - - Self{ owned_ports, port_mapping } - } - - /// Prepares the port mapping for a new branch. Assumes that there is no - /// intermediate branch index that we have skipped. - fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) { - let num_ports = self.owned_ports.len(); - let parent_base_idx = parent_branch_idx as usize * num_ports; - let new_base_idx = new_branch_idx as usize * num_ports; - - debug_assert!(parent_branch_idx < new_branch_idx); - debug_assert!(new_base_idx == self.port_mapping.len()); - - self.port_mapping.reserve(num_ports); - for offset in 0..num_ports { - let parent_port = &self.port_mapping[parent_base_idx + offset]; - let parent_port = parent_port.clone(); - self.port_mapping.push(parent_port); - } - } - - /// Adds a new port. Caller must make sure that the connector is not in the - /// sync phase. - fn add_port(&mut self, port_id: PortIdLocal) { - debug_assert!(self.port_mapping.len() == self.owned_ports.len()); - debug_assert!(!self.owned_ports.contains(&port_id)); - self.owned_ports.push(port_id); - self.port_mapping.push(PortAssignment::new_unassigned()); - } - - /// Commits to a particular branch. Essentially just removes the port - /// mapping information generated during the sync phase. - fn commit_to_sync(&mut self) { - self.port_mapping.truncate(self.owned_ports.len()); - debug_assert!(self.port_mapping.iter().all(|v| { - !v.is_assigned && !v.last_registered_branch_id.is_valid() - })); - } - - /// Removes a particular port from the connector. May only be done if the - /// connector is in non-sync mode - fn remove_port(&mut self, port_id: PortIdLocal) { - debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode - let port_index = self.get_port_index(port_id).unwrap(); - self.owned_ports.remove(port_index); - self.port_mapping.remove(port_index); - } - - /// Retrieves the index associated with a port id. Note that the port might - /// not exist (yet) if a speculative branch has just received the port. - /// TODO: But then again, one cannot use that port, right? - #[inline] - fn get_port_index(&self, port_id: PortIdLocal) -> Option { - for (idx, port) in self.owned_ports.iter().enumerate() { - if *port == port_id { - return Some(idx) - } - } - - return None - } - - /// Retrieves the ID associated with the port at the provided index - #[inline] - fn get_port_id(&self, port_index: usize) -> PortIdLocal { - return self.owned_ports[port_index]; - } - - #[inline] - fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment { - let mapped_idx = self.mapped_index(branch_idx, port_idx); - return &self.port_mapping[mapped_idx]; - } - - #[inline] - fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment { - let mapped_idx = self.mapped_index(branch_idx, port_idx); - return &mut self.port_mapping[mapped_idx]; - } - - #[inline] - fn num_ports(&self) -> usize { - return self.owned_ports.len(); - } - - - // Function for internal use: retrieve index in flattened port mapping array - // based on branch/port index. - #[inline] - fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize { - let branch_idx = branch_idx as usize; - let num_ports = self.owned_ports.len(); - - debug_assert!(port_idx < num_ports); - debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len()); - - return branch_idx * num_ports + port_idx; - } -} - -struct BranchQueue { - first: u32, - last: u32, -} - -impl BranchQueue { - #[inline] - fn new() -> Self { - Self{ first: 0, last: 0 } - } - - #[inline] - fn is_empty(&self) -> bool { - debug_assert!((self.first == 0) == (self.last == 0)); - return self.first == 0; - } - - #[inline] - fn clear(&mut self) { - self.first = 0; - self.last = 0; - } -} - -/// Public fields of the connector that can be freely shared between multiple -/// threads. pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, pub sleeping: AtomicBool, @@ -324,65 +55,46 @@ impl ConnectorPublic { } } -// TODO: Maybe prevent false sharing by aligning `public` to next cache line. -// TODO: Do this outside of the connector, create a wrapping struct +#[derive(Eq, PartialEq)] +pub(crate) enum ConnectorScheduling { + Immediate, // Run again, immediately + Later, // Schedule for running, at some later point in time + NotNow, // Do not reschedule for running + Exit, // Connector has exited +} + pub(crate) struct ConnectorPDL { - // Branch management - branches: Vec, // first branch is always non-speculative one - sync_active: BranchQueue, - sync_pending_get: BranchQueue, - sync_finished: BranchQueue, - sync_finished_last_handled: u32, // TODO: Change to BranchId? - cur_round: u32, - // Port/message management - pub committed_to: Option<(ConnectorId, u64)>, - pub ports: ConnectorPorts, + tree: ExecTree, + consensus: Consensus, } -// TODO: Remove this monstrosity struct ConnectorRunContext<'a> { - branch_index: u32, - ports: &'a ConnectorPorts, - ports_delta: &'a Vec, - received: &'a HashMap, + branch_id: BranchId, + consensus: &'a Consensus, + received: &'a HashMap, scheduler: SchedulerCtx<'a>, prepared_channel: Option<(Value, Value)>, } -impl<'a> RunContext for ConnectorRunContext<'a> { +impl<'a> RunContext for ConnectorRunContext<'a>{ fn did_put(&mut self, port: PortId) -> bool { - if self.ports_delta.iter().any(|v| v.port.self_id.index == port.0.u32_suffix) { - // Either acquired or released, must be silent - return false; - } - - let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap(); - let mapping = self.ports.get_port(self.branch_index, port_index); - return mapping.is_assigned; + let port_id = PortIdLocal::new(port.0.u32_suffix); + let annotation = self.consensus.get_annotation(self.branch_id, port_id); + return annotation.registered_id.is_some(); } fn get(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); match self.received.get(&port_id) { - Some(message) => Some(message.message.clone()), + Some(data) => Some(data.clone()), None => None, } } fn fires(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); - if self.ports_delta.iter().any(|v| v.port.self_id == port_id) { - return None - } - - let port_index = self.ports.get_port_index(port_id).unwrap(); - let mapping = self.ports.get_port(self.branch_index, port_index); - - if mapping.is_assigned { - return Some(Value::Bool(mapping.num_times_fired != 0)); - } else { - return None; - } + let annotation = self.consensus.get_annotation(self.branch_id, port_id); + return annotation.expected_firing.map(|v| Value::Bool(v)); } fn get_channel(&mut self) -> Option<(Value, Value)> { @@ -391,57 +103,16 @@ impl<'a> RunContext for ConnectorRunContext<'a> { } impl Connector for ConnectorPDL { - fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); - if comp_ctx.is_in_sync() { - let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx); - - // When in speculative mode we might have generated new sync - // solutions, we need to turn them into proposed solutions here. - if self.sync_finished_last_handled != self.sync_finished.last { - // Retrieve first element in queue - let mut next_id; - if self.sync_finished_last_handled == 0 { - next_id = self.sync_finished.first; - } else { - let last_handled = &self.branches[self.sync_finished_last_handled as usize]; - debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" - next_id = last_handled.next_branch_in_queue.unwrap(); - } - - loop { - let branch_id = BranchId::new(next_id); - let branch = &self.branches[next_id as usize]; - let branch_next = branch.next_branch_in_queue; - - // Turn local solution into a message and send it along - // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed - let solution_message = self.generate_initial_solution_for_branch(branch_id, comp_ctx); - if let Some(valid_solution) = solution_message { - self.submit_sync_solution(valid_solution, comp_ctx); - } else { - // Branch is actually invalid, but we only just figured - // it out. We need to mark it as invalid to prevent - // future use - Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); - if branch_id.index == self.sync_finished_last_handled { - self.sync_finished_last_handled = self.sync_finished.last; - } - - let branch = &mut self.branches[next_id as usize]; - branch.sync_state = SpeculativeState::Inconsistent; - } - - match branch_next { - Some(id) => next_id = id, - None => break, - } - } - - self.sync_finished_last_handled = next_id; + if self.tree.is_in_sync() { + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { + self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } else { + return scheduling } - - return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; @@ -450,612 +121,241 @@ impl Connector for ConnectorPDL { } impl ConnectorPDL { - /// Constructs a representation of a connector. The assumption is that the - /// initial branch is at the first instruction of the connector's code, - /// hence is in a non-sync state. - pub fn new(initial_branch: Branch, owned_ports: Vec) -> Self { + pub fn new(initial: ComponentState) -> Self { Self{ - branches: vec![initial_branch], - sync_active: BranchQueue::new(), - sync_pending_get: BranchQueue::new(), - sync_finished: BranchQueue::new(), - sync_finished_last_handled: 0, // none at all - cur_round: 0, - committed_to: None, - ports: ConnectorPorts::new(owned_ports), + tree: ExecTree::new(initial), + consensus: Consensus::new(), } } - // ------------------------------------------------------------------------- - // Handling connector messages - // ------------------------------------------------------------------------- + // --- Handling messages - pub fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtxFancy) { - while let Some(message) = comp_ctx.read_next_message() { + pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) { + while let Some(message) = ctx.read_next_message() { match message { - ReceivedMessage::Data((target_port_id, contents)) => { - self.handle_data_message(target_port_id, &contents); - }, - ReceivedMessage::Sync(contents) => { - self.handle_sync_message(contents, comp_ctx); - }, - ReceivedMessage::RequestCommit(contents) => { - self.handle_request_commit_message(contents, comp_ctx); - }, - ReceivedMessage::ConfirmCommit(contents) => { - self.handle_confirm_commit_message(contents, comp_ctx); - }, - } - } - } - - pub fn handle_data_message(&mut self, target_port_id: PortIdLocal, message: &DataMessage) { - // Go through all branches that are waiting for a message - let mut branch_idx = self.sync_pending_get.first; - while branch_idx != 0 { - let branch = &self.branches[branch_idx as usize]; - let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0); - - let target_port_index = self.ports.get_port_index(target_port_id).unwrap(); - let port_mapping = self.ports.get_port(branch_idx, target_port_index); - - // Check if the branch may accept the message - if branch.sync_state == SpeculativeState::HaltedAtBranchPoint && - branch.halted_at_port == target_port_id && - port_mapping.last_registered_branch_id == message.sender_prev_branch_id - { - // Branch can accept. So fork it, and let the fork accept the - // message. The original branch stays waiting for new messages. - let new_branch_idx = self.branches.len() as u32; - let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch); - - self.ports.prepare_sync_branch(branch_idx, new_branch_idx); - let mapping = self.ports.get_port_mut(branch_idx, target_port_index); - mapping.last_registered_branch_id = message.sender_cur_branch_id; - - let new_branch_id = BranchId::new(new_branch_idx); - self.branches.push(new_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id) + Message::Data(message) => self.handle_new_data_message(message, ctx), + Message::Sync(message) => self.handle_new_sync_message(message, ctx), + Message::Control(_) => unreachable!("control message in component"), } - - branch_idx = next_branch_idx; } } - /// Accepts a synchronous message and combines it with the locally stored - /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate. - pub fn handle_sync_message(&mut self, message: SyncMessage, comp_ctx: &mut ComponentCtxFancy) { - debug_assert!(!message.to_visit.contains(&comp_ctx.id)); // own ID already removed - debug_assert!(message.constraints.iter().any(|v| v.connector_id == comp_ctx.id)); // we have constraints - - // TODO: Optimize, use some kind of temp workspace vector - let mut execution_path_branch_ids = Vec::new(); - - if self.sync_finished_last_handled != 0 { - // We have some solutions to match against - let constraints_index = message.constraints - .iter() - .position(|v| v.connector_id == comp_ctx.id) - .unwrap(); - let constraints = &message.constraints[constraints_index].constraints; - debug_assert!(!constraints.is_empty()); - - // Note that we only iterate over the solutions we've already - // handled ourselves, not necessarily - let mut branch_index = self.sync_finished.first; - 'branch_loop: loop { - // Load solution branch - let branch = &self.branches[branch_index as usize]; - execution_path_branch_ids.clear(); - self.branch_ids_of_execution_path(BranchId::new(branch_index), &mut execution_path_branch_ids); - - // Check if the branch matches all of the applied constraints - for constraint in constraints { - match constraint { - SyncBranchConstraint::SilentPort(silent_port_id) => { - let port_index = self.ports.get_port_index(*silent_port_id); - if port_index.is_none() { - // Nefarious peer - continue 'branch_loop; - } - let port_index = port_index.unwrap(); - - let mapping = self.ports.get_port(branch_index, port_index); - debug_assert!(mapping.is_assigned); - - if mapping.num_times_fired != 0 { - // Not silent, constraint not satisfied - continue 'branch_loop; - } - }, - SyncBranchConstraint::BranchNumber(expected_branch_id) => { - if !execution_path_branch_ids.contains(expected_branch_id) { - // Not the expected execution path, constraint not satisfied - continue 'branch_loop; - } - }, - SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { - let port_index = self.ports.get_port_index(*port_id); - if port_index.is_none() { - // Nefarious peer - continue 'branch_loop; - } - let port_index = port_index.unwrap(); - - let mapping = self.ports.get_port(branch_index, port_index); - if mapping.last_registered_branch_id != *expected_branch_id { - // Not the expected interaction on this port, constraint not satisfied - continue 'branch_loop; - } - }, - } - } - - // If here, then all of the external constraints were satisfied - // for the current branch. But the branch itself also imposes - // constraints. So while building up the new solution, make sure - // that those are satisfied as well. - // TODO: Code below can probably be merged with initial solution - // generation. + pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + // Go through all branches that are awaiting new messages and see if + // there is one that can receive this message. + debug_assert!(ctx.workspace_branches.is_empty()); + let mut branches = Vec::new(); // TODO: @Remove + self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches); - // - clone old solution so we can add to it - let mut new_solution = message.clone(); - - // - determine the initial port mapping - let num_ports = self.ports.num_ports(); - let mut new_solution_mapping = Vec::with_capacity(num_ports); - for port_index in 0..self.ports.num_ports() { - let port_id = self.ports.get_port_id(port_index); - let mapping = self.ports.get_port(branch_index, port_index); - new_solution_mapping.push((port_id, mapping.last_registered_branch_id)); - } - - // - replace constraints with a local solution - new_solution.constraints.remove(constraints_index); - new_solution.local_solutions.push(SyncConnectorSolution{ - connector_id: comp_ctx.id, - terminating_branch_id: BranchId::new(branch_index), - execution_branch_ids: execution_path_branch_ids.clone(), - final_port_mapping: new_solution_mapping, - }); - - // - do a second pass on the ports to generate and add the - // constraints that should be applied to other connectors - for port_index in 0..self.ports.num_ports() { - let port_id = self.ports.get_port_id(port_index); - - let (peer_connector_id, peer_port_id, peer_is_getter) = { - let port = comp_ctx.get_port_by_id(port_id).unwrap(); - (port.peer_connector, port.peer_id, port.kind == PortKind::Putter) - }; - - let mapping = self.ports.get_port(branch_index, port_index); - let constraint = if mapping.num_times_fired == 0 { - SyncBranchConstraint::SilentPort(peer_port_id) - } else { - if peer_is_getter { - SyncBranchConstraint::PortMapping(peer_port_id, mapping.last_registered_branch_id) - } else { - SyncBranchConstraint::BranchNumber(mapping.last_registered_branch_id) - } - }; - - match new_solution.add_or_check_constraint(peer_connector_id, constraint) { - Err(_) => continue 'branch_loop, - Ok(false) => continue 'branch_loop, - Ok(true) => {}, - } - } + for branch_id in branches.drain(..) { + // This branch can receive, so fork and given it the message + let receiving_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + let receiving_branch = &mut self.tree[receiving_branch_id]; - // If here, then the newly generated solution is completely - // compatible. - let next_branch = branch.next_branch_in_queue; - self.submit_sync_solution(new_solution, comp_ctx); + receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); - // Consider the next branch - if branch_index == self.sync_finished_last_handled { - // At the end of the previously handled solutions - break; - } - - debug_assert!(next_branch.is_some()); // because we cannot be at the end of the queue - branch_index = next_branch.unwrap(); - } + // And prepare the branch for running + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } - fn handle_request_commit_message(&mut self, mut message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) { - let should_propagate_message = match &self.committed_to { - Some((previous_origin, previous_comparison)) => { - // Already committed to something. So will commit to this if it - // takes precedence over the current solution - message.comparison_number > *previous_comparison || - (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_origin.0) - }, - None => { - // Not yet committed to a solution, so commit to this one - true - } - }; - - if should_propagate_message { - self.committed_to = Some((message.connector_origin, message.comparison_number)); - - if message.to_visit.is_empty() { - // Visited all of the connectors, so every connector can now - // apply the solution - // TODO: Use temporary workspace - let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1); - for (connector_id, _) in &message.local_solutions { - if *connector_id != comp_ctx.id { - to_visit.push(*connector_id); - } - } - - message.to_visit = to_visit; - comp_ctx.submit_message(MessageContents::ConfirmCommit(message.clone())); - self.handle_confirm_commit_message(message, comp_ctx); - } else { - // Not yet visited all of the connectors - comp_ctx.submit_message(MessageContents::RequestCommit(message)); - } + pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { + if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { + self.collapse_sync_to_solution_branch(solution_branch_id, ctx); } } - fn handle_confirm_commit_message(&mut self, message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) { - // Make sure this is the message we actually committed to. As long as - // we're running on a single machine this is fine. - // TODO: Take care of nefarious peers - let (expected_connector_id, expected_comparison_number) = - self.committed_to.unwrap(); - assert_eq!(message.connector_origin, expected_connector_id); - assert_eq!(message.comparison_number, expected_comparison_number); - - // Find the branch we're supposed to commit to - let (_, branch_id) = message.local_solutions - .iter() - .find(|(id, _)| *id == comp_ctx.id) - .unwrap(); - let branch_id = *branch_id; - - // Commit to the branch. That is: move the solution branch to the first - // of the connector's branches - self.branches.swap(0, branch_id.index as usize); - self.branches.truncate(1); // TODO: Or drain and do not deallocate? - let solution_branch = &mut self.branches[0]; + // --- Running code - // Clear all of the other sync-related variables - self.sync_active.clear(); - self.sync_pending_get.clear(); - self.sync_finished.clear(); - self.sync_finished_last_handled = 0; - self.cur_round += 1; - - self.committed_to = None; - self.ports.commit_to_sync(); - - // Add/remove any of the ports we lost during the sync phase - // TODO: Probably might not need this with the port syncing - for port_delta in &solution_branch.ports_delta { - if port_delta.is_acquired { - self.ports.add_port(port_delta.port.self_id); - } else { - self.ports.remove_port(port_delta.port.self_id); - } - } - - comp_ctx.notify_sync_end(&solution_branch.ports_delta); - solution_branch.commit_to_sync(); - } - - // ------------------------------------------------------------------------- - // Executing connector code - // ------------------------------------------------------------------------- - - /// Runs the connector in synchronous mode. Potential changes to the global - /// system's state are added to the `RunDeltaState` object by the connector, - /// where it is the caller's responsibility to immediately take care of - /// those changes. The return value indicates when (and if) the connector - /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - debug_assert!(comp_ctx.is_in_sync()); - - if self.sync_active.is_empty() { + pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + // Check if we have any branch that needs running + debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); + let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); + if branch_id.is_none() { return ConnectorScheduling::NotNow; } - let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active); + // Retrieve the branch and run it + let branch_id = branch_id.unwrap(); + let branch = &mut self.tree[branch_id]; - // Run the branch to the next blocking point - debug_assert!(branch.prepared_channel.is_none()); - let mut run_context = ConnectorRunContext { - branch_index: branch.index.index, - ports: &self.ports, - ports_delta: &branch.ports_delta, + let mut run_context = ConnectorRunContext{ + branch_id, + consensus: &self.consensus, + received: &branch.inbox, scheduler: sched_ctx, - prepared_channel: None, - received: &branch.received, + prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); - // Match statement contains `return` statements only if the particular - // run result behind handled requires an immediate re-run of the - // connector. + // Handle the returned result. Note that this match statement contains + // explicit returns in case the run result requires that the component's + // code is ran again immediately match run_result { RunResult::BranchInconsistent => { - // Speculative branch became inconsistent + // Branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, RunResult::BranchMissingPortState(port_id) => { - // Branch called `fires()` on a port that does not yet have an - // assigned speculative value. So we need to create those - // branches - let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); - let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); - - debug_assert!(self.ports.owned_ports.contains(&local_port_id)); + // Branch called `fires()` on a port that has not been used yet. + let port_id = PortIdLocal::new(port_id.0.u32_suffix); - // Create two copied branches, one silent and one firing + // Create two forks, one that assumes the port will fire, and + // one that assumes the port remains silent branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - let parent_branch_id = branch.index; - let parent_branch = &self.branches[parent_branch_id.index as usize]; - - let silent_index = self.branches.len() as u32; - let firing_index = silent_index + 1; - - let silent_branch = Branch::new_sync_branching_from(silent_index, parent_branch); - self.ports.prepare_sync_branch(parent_branch.index.index, silent_index); - - let firing_branch = Branch::new_sync_branching_from(firing_index, parent_branch); - self.ports.prepare_sync_branch(parent_branch.index.index, firing_index); - // Assign the port values of the two new branches - let silent_port = self.ports.get_port_mut(silent_index, local_port_index); - silent_port.mark_speculative(0); + let firing_branch_id = self.tree.fork_branch(branch_id); + let silent_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, firing_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); + debug_assert_eq!(_result, Consistency::Valid); + self.consensus.notify_of_new_branch(branch_id, silent_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); + debug_assert_eq!(_result, Consistency::Valid); - let firing_port = self.ports.get_port_mut(firing_index, local_port_index); - firing_port.mark_speculative(1); - - // Run both branches again - let silent_branch_id = silent_branch.index; - self.branches.push(silent_branch); - let firing_branch_id = firing_branch.index; - self.branches.push(firing_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch_id); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch_id); + // Somewhat important: we push the firing one first, such that + // that branch is ran again immediately. + self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); + self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); return ConnectorScheduling::Immediate; }, RunResult::BranchMissingPortValue(port_id) => { - // Branch performed a `get` on a port that has not yet received - // a value in its inbox. - let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); - let local_port_index = self.ports.get_port_index(local_port_id); - if local_port_index.is_none() { - todo!("deal with the case where the port is acquired"); - } - let local_port_index = local_port_index.unwrap(); - let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); - - // Check for port mapping assignment and, if present, if it is - // consistent - let is_valid_get = if port_mapping.is_assigned { - assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires` - port_mapping.num_times_fired == 1 - } else { - // Not yet assigned - port_mapping.mark_speculative(1); - true - }; - - if is_valid_get { - // Mark as a branching point for future messages + // Branch performed a `get()` on a port that does not have a + // received message on that port. + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + // `get()` is valid, so mark the branch as awaiting a message branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.halted_at_port = local_port_id; - let branch_id = branch.index; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id); - - // But if some messages can be immediately applied, do so - // now. - let messages = comp_ctx.get_read_data_messages(local_port_id, port_mapping.last_registered_branch_id); - let mut did_have_messages = false; - - for message in messages { - did_have_messages = true; - - // For each message prepare a new branch to execute - let parent_branch = &self.branches[branch_id.index as usize]; - let new_branch_index = self.branches.len() as u32; - let mut new_branch = Branch::new_sync_branching_from(new_branch_index, parent_branch); - self.ports.prepare_sync_branch(branch_id.index, new_branch_index); - - let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); - port_mapping.last_registered_branch_id = message.sender_cur_branch_id; - debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1); - - new_branch.received.insert(local_port_id, message.clone()); - - // If the message contains any ports then they will now - // be owned by the new branch - let mut transferred_ports = Vec::new(); // TODO: Create workspace somewhere - find_ports_in_value_group(&message.message, &mut transferred_ports); - Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &transferred_ports); - - // Schedule the new branch - debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); - let new_branch_id = new_branch.index; - self.branches.push(new_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id); + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + // Note: we only know that a branch is waiting on a message when + // it reaches the `get` call. But we might have already received + // a message that targets this branch, so check now. + let mut any_branch_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + + branch.insert_message(port_id, message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_branch_received = true; + } } - if did_have_messages { - // If we did create any new branches, then we can run - // them immediately. + if any_branch_received { return ConnectorScheduling::Immediate; } } else { branch.sync_state = SpeculativeState::Inconsistent; } - }, + } RunResult::BranchAtSyncEnd => { - // Branch is done, go through all of the ports that are not yet - // assigned and map them to non-firing. - for port_idx in 0..self.ports.num_ports() { - let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx); - if !port_mapping.is_assigned { - port_mapping.mark_speculative(0); - } + let consistency = self.consensus.notify_of_finished_branch(branch_id); + if consistency == Consistency::Valid { + branch.sync_state = SpeculativeState::ReachedSyncEnd; + self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); + } else if consistency == Consistency::Inconsistent { + branch.sync_state = SpeculativeState::Inconsistent; } - - let branch_id = branch.index; - branch.sync_state = SpeculativeState::ReachedSyncEnd; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch_id); }, - RunResult::BranchPut(port_id, value_group) => { - // Branch performed a `put` on a particualar port. - let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix }; - let local_port_index = self.ports.get_port_index(local_port_id); - if local_port_index.is_none() { - todo!("handle case where port was received before (i.e. in ports_delta)") - } - let local_port_index = local_port_index.unwrap(); - - // Check the port mapping for consistency - // TODO: For now we can only put once, so that simplifies stuff - let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); - let is_valid_put = if port_mapping.is_assigned { - // Already assigned, so must be speculative and one time - // firing, otherwise we are `put`ing multiple times. - if port_mapping.last_registered_branch_id.is_valid() { - // Already did a `put` - todo!("handle error through RunDeltaState"); - } else { - // Valid if speculatively firing - port_mapping.num_times_fired == 1 - } - } else { - // Not yet assigned, do so now - true - }; - - if is_valid_put { - // Put in run results for thread to pick up and transfer to - // the correct connector inbox. - port_mapping.mark_definitive(branch.index, 1); - let message = DataMessage{ - sending_port: local_port_id, - sender_prev_branch_id: BranchId::new_invalid(), - sender_cur_branch_id: branch.index, - message: value_group, - }; - - // If the message contains any ports then we release our - // ownership over them in this branch - let mut transferred_ports = Vec::new(); // TODO: Put in some temp workspace - find_ports_in_value_group(&message.message, &mut transferred_ports); - Self::release_ports_during_sync(&mut self.ports, branch, &transferred_ports).unwrap(); - - comp_ctx.submit_message(MessageContents::Data(message)); - - let branch_index = branch.index; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index); - return ConnectorScheduling::Immediate + RunResult::BranchPut(port_id, content) => { + // Branch is attempting to send data + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + // `put()` is valid. + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + comp_ctx.submit_message(Message::Data(DataMessage { + sync_header, data_header, + content: DataContent::Message(content), + })); + + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; } else { branch.sync_state = SpeculativeState::Inconsistent; } }, - _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result), + _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } - // Not immediately scheduling, so schedule again if there are more - // branches to run - if self.sync_active.is_empty() { + // If here then the run result did not require a particular action. We + // return whether we have more active branches to run or not. + if self.tree.queue_is_empty(QueueKind::Runnable) { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; } } - /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - debug_assert!(!comp_ctx.is_in_sync()); - debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); - debug_assert!(self.branches.len() == 1); + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); - let branch = &mut self.branches[0]; + let branch = self.tree.base_branch_mut(); debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ - branch_index: branch.index.index, - ports: &self.ports, - ports_delta: &branch.ports_delta, + branch_id: branch.id, + consensus: &self.consensus, + received: &branch.inbox, scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), - received: &branch.received, }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); match run_result { RunResult::ComponentTerminated => { - // Need to wait until all children are terminated - // TODO: Think about how to do this? branch.sync_state = SpeculativeState::Finished; + return ConnectorScheduling::Exit; }, RunResult::ComponentAtSyncStart => { - // Prepare for sync execution and reschedule immediately - // TODO: Not sure about this. I want a clear synchronization - // point between scheduler/component view on the ports. But is - // this the way to do it? - let current_ports = comp_ctx.notify_sync_start(); - for port in current_ports { - debug_assert!(self.ports.get_port_index(port.self_id).is_some()); - } + comp_ctx.notify_sync_start(); + let sync_branch_id = self.tree.start_sync(); + self.consensus.start_sync(comp_ctx); + self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); + self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); - let first_sync_branch = Branch::new_sync_branching_from(1, branch); - let first_sync_branch_id = first_sync_branch.index; - self.ports.prepare_sync_branch(0, 1); - self.branches.push(first_sync_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id); - - return ConnectorScheduling::Later; + return ConnectorScheduling::Immediate; }, RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { - // Construction of a new component. Find all references to ports - // inside of the arguments - let mut transferred_ports = Vec::new(); - find_ports_in_value_group(&arguments, &mut transferred_ports); - - if !transferred_ports.is_empty() { - // Ports changing ownership - if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &transferred_ports) { - todo!("fatal error handling"); - } - } + // Note: we're relinquishing ownership of ports. But because + // we are in non-sync mode the scheduler will handle and check + // port ownership transfer. + debug_assert!(comp_ctx.workspace_ports.is_empty()); + find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); - // Add connector for later execution - let new_connector_state = ComponentState { + let new_state = ComponentState { prompt: Prompt::new( &sched_ctx.runtime.protocol_description.types, &sched_ctx.runtime.protocol_description.heap, definition_id, monomorph_idx, arguments - ) + ), }; - - let new_connector_branch = Branch::new_initial_branch(new_connector_state); - let new_connector = ConnectorPDL::new(new_connector_branch, transferred_ports); - - comp_ctx.push_component(new_connector); + let new_component = ConnectorPDL::new(new_state); + comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); + comp_ctx.workspace_ports.clear(); return ConnectorScheduling::Later; }, RunResult::NewChannel => { - // Need to prepare a new channel let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); - debug_assert_eq!(getter.kind, PortKind::Getter); + debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); branch.prepared_channel = Some(( Value::Input(PortId::new(putter.self_id.index)), - Value::Output(PortId::new(getter.self_id.index)) + Value::Output(PortId::new(getter.self_id.index)), )); comp_ctx.push_port(putter); @@ -1067,385 +367,16 @@ impl ConnectorPDL { } } - // ------------------------------------------------------------------------- - // Internal helpers - // ------------------------------------------------------------------------- - - // Helpers for management of the branches and their internally stored - // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming - // linked lists inside of the vector of branches. + pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) { + let mut fake_vec = Vec::new(); + self.tree.end_sync(solution_branch_id); + self.consensus.end_sync(solution_branch_id, &mut fake_vec); - /// Pops from front of linked-list branch queue. - fn pop_branch_from_queue<'a>(branches: &'a mut Vec, queue: &mut BranchQueue) -> &'a mut Branch { - debug_assert!(queue.first != 0); - let branch = &mut branches[queue.first as usize]; - queue.first = branch.next_branch_in_queue.unwrap_or(0); - branch.next_branch_in_queue = None; - - if queue.first == 0 { - // No more entries in queue - debug_assert_eq!(queue.last, branch.index.index); - queue.last = 0; + for port in fake_vec { + // TODO: Handle sent/received ports + debug_assert!(ctx.get_port_by_id(port).is_some()); } - return branch; - } - - /// Pushes branch at the end of the linked-list branch queue. - fn push_branch_into_queue( - branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId, - ) { - debug_assert!(to_push.is_valid()); - let to_push = to_push.index; - - if queue.last == 0 { - // No branches in the queue at all - debug_assert_eq!(queue.first, 0); - branches[to_push as usize].next_branch_in_queue = None; - queue.first = to_push; - queue.last = to_push; - } else { - // Pre-existing branch in the queue - debug_assert_ne!(queue.first, 0); - branches[queue.last as usize].next_branch_in_queue = Some(to_push); - queue.last = to_push; - } - } - - /// Removes branch from linked-list queue. Walks through the entire list to - /// find the element (!). Assumption is that this is not called often. - fn remove_branch_from_queue( - branches: &mut Vec, queue: &mut BranchQueue, to_delete: BranchId, - ) { - debug_assert!(to_delete.is_valid()); // we're deleting a valid item - debug_assert!(queue.first != 0 && queue.last != 0); // queue isn't empty to begin with - - // Retrieve branch and its next element - let branch_to_delete = &mut branches[to_delete.index as usize]; - let branch_next_index_option = branch_to_delete.next_branch_in_queue; - let branch_next_index_unwrapped = branch_next_index_option.unwrap_or(0); - branch_to_delete.next_branch_in_queue = None; - - // Walk through all elements in queue to find branch to delete - let mut prev_index = 0; - let mut next_index = queue.first; - - while next_index != 0 { - if next_index == to_delete.index { - // Found the element we're going to delete - // - check if at the first element or not - if prev_index == 0 { - queue.first = branch_next_index_unwrapped; - } else { - let prev_branch = &mut branches[prev_index as usize]; - prev_branch.next_branch_in_queue = branch_next_index_option; - } - - // - check if at last element or not (also takes care of "no elements left in queue") - if branch_next_index_option.is_none() { - queue.last = prev_index; - } - - return; - } - - prev_index = next_index; - let entry = &branches[next_index as usize]; - next_index = entry.next_branch_in_queue.unwrap_or(0); - } - - // If here, then we didn't find the element - panic!("branch does not exist in provided queue"); - } - - // Helpers for local port management. Specifically for adopting/losing - // ownership over ports, and for checking if specific ports can be sent - // over another port. - - /// Releasing ownership of ports while in non-sync mode. This only occurs - /// while instantiating new connectors - fn release_ports_during_non_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { - debug_assert!(!branch.index.is_valid()); // branch in non-sync mode - - for port_id in port_ids { - // We must own the port, or something is wrong with our code - todo!("Set up some kind of message router"); - debug_assert!(ports.get_port_index(*port_id).is_some()); - ports.remove_port(*port_id); - } - - return Ok(()) - } - - /// Releasing ownership of ports during a sync-session. Will provide an - /// error if the port was already used during a sync block. - fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { - if port_ids.is_empty() { - return Ok(()) - } - - todo!("unfinished: add port properties during final solution-commit msgs"); - debug_assert!(branch.index.is_valid()); // branch in sync mode - - for port_id in port_ids { - match ports.get_port_index(*port_id) { - Some(port_index) => { - // We (used to) own the port. Make sure it is not given away - // already and not used to put/get data. - let port_mapping = ports.get_port(branch.index.index, port_index); - if port_mapping.is_assigned && port_mapping.num_times_fired != 0 { - // Already used - return Err(PortOwnershipError::UsedInInteraction(*port_id)); - } - - for delta in &branch.ports_delta { - if delta.port.self_id == *port_id { - // We cannot have acquired this port, because the - // call to `ports.get_port_index` returned an index. - debug_assert!(!delta.is_acquired); - return Err(PortOwnershipError::AlreadyGivenAway(*port_id)); - } - } - - // TODO: Obtain port description - // branch.ports_delta.push(ComponentPortChange{ - // is_acquired: false, - // port_id: *port_id, - // }); - }, - None => { - // Not in port mapping, so we must have acquired it before, - // remove the acquirement. - let mut to_delete_index: isize = -1; - for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { - if delta.port.self_id == *port_id { - debug_assert!(delta.is_acquired); - to_delete_index = delta_idx as isize; - break; - } - } - - debug_assert!(to_delete_index != -1); - branch.ports_delta.remove(to_delete_index as usize); - } - } - } - - return Ok(()) - } - - /// Acquiring ports during a sync-session. - fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { - if port_ids.is_empty() { - return Ok(()) - } - - todo!("unfinished: add port properties during final solution-commit msgs"); - debug_assert!(branch.index.is_valid()); // branch in sync mode - - 'port_loop: for port_id in port_ids { - for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { - if delta.port.self_id == *port_id { - if delta.is_acquired { - // Somehow already received this port. - // TODO: @security - todo!("take care of nefarious peers"); - } else { - // Sending ports to ourselves - debug_assert!(ports.get_port_index(*port_id).is_some()); - branch.ports_delta.remove(delta_idx); - continue 'port_loop; - } - } - } - - // If here then we can safely acquire the new port - // TODO: Retrieve port infor - // branch.ports_delta.push(PortOwnershipDelta{ - // acquired: true, - // port_id: *port_id, - // }); - } - - return Ok(()) - } - - // Helpers for generating and handling sync messages (and the solutions that - // are described by those sync messages) - - /// Generates the initial solution for a finished sync branch. If initial - /// local solution is valid, then the appropriate message is returned. - /// Otherwise the initial solution is inconsistent. - fn generate_initial_solution_for_branch(&self, branch_id: BranchId, comp_ctx: &ComponentCtxFancy) -> Option { - // Retrieve branchh - debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode - let branch = &self.branches[branch_id.index as usize]; - debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd); - - // Set up storage (this is also the storage for all of the connectors - // that will be visited, hence the initial size approximation) - let mut all_branch_ids = Vec::new(); - self.branch_ids_of_execution_path(branch_id, &mut all_branch_ids); - - let num_ports = self.ports.num_ports(); - let approximate_peers = num_ports; - let mut initial_solution_port_mapping = Vec::with_capacity(num_ports); - for port_idx in 0..self.ports.num_ports() { - let port_id = self.ports.get_port_id(port_idx); - let port_desc = self.ports.get_port(branch_id.index, port_idx); - - // Note: if assigned then we expect a valid branch ID. Otherwise we have the "invalid - // branch" as ID, marking that we want it to be silent - debug_assert!(port_desc.is_assigned == port_desc.last_registered_branch_id.is_valid()); - initial_solution_port_mapping.push((port_id, port_desc.last_registered_branch_id)); - } - - let initial_local_solution = SyncConnectorSolution{ - connector_id: comp_ctx.id, - terminating_branch_id: branch_id, - execution_branch_ids: all_branch_ids, - final_port_mapping: initial_solution_port_mapping, - }; - - let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers); - - // Turn local port mapping into constraints on other connectors - - // - constraints on other components due to transferred ports - for port_delta in &branch.ports_delta { - // For transferred ports we always have two constraints: one for the - // sender and one for the receiver, ensuring it was not used. - // TODO: This will fail if a port is passed around multiple times. - // maybe a special "passed along" entry in `ports_delta`. - if !sync_message.check_constraint(comp_ctx.id, SyncBranchConstraint::SilentPort(port_delta.port.self_id)).unwrap() { - return None; - } - - // Might need to check if we own the other side of the channel - let port = comp_ctx.get_port_by_id(port_delta.port.self_id).unwrap(); - if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() { - return None; - } - } - - // - constraints on other components due to owned ports - for port_index in 0..self.ports.num_ports() { - let port_id = self.ports.get_port_id(port_index); - let port_mapping = self.ports.get_port(branch_id.index, port_index); - let port = comp_ctx.get_port_by_id(port_id).unwrap(); - - let constraint = if port_mapping.is_assigned { - if port.kind == PortKind::Getter { - SyncBranchConstraint::BranchNumber(port_mapping.last_registered_branch_id) - } else { - SyncBranchConstraint::PortMapping(port.peer_id, port_mapping.last_registered_branch_id) - } - } else { - SyncBranchConstraint::SilentPort(port.peer_id) - }; - - if !sync_message.add_or_check_constraint(port.peer_connector, constraint).unwrap() { - return None; - } - } - - return Some(sync_message); - } - - fn submit_sync_solution(&mut self, partial_solution: SyncMessage, comp_ctx: &mut ComponentCtxFancy) { - if partial_solution.to_visit.is_empty() { - // Solution is completely consistent. So ask everyone to commit - // TODO: Maybe another package for random? - let comparison_number: u64 = unsafe { - let mut random_array = [0u8; 8]; - getrandom::getrandom(&mut random_array).unwrap(); - std::mem::transmute(random_array) - }; - - let num_local = partial_solution.local_solutions.len(); - - let mut full_solution = SolutionMessage{ - comparison_number, - connector_origin: comp_ctx.id, - local_solutions: Vec::with_capacity(num_local), - to_visit: Vec::with_capacity(num_local - 1), - }; - - for local_solution in &partial_solution.local_solutions { - full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id)); - if local_solution.connector_id != comp_ctx.id { - full_solution.to_visit.push(local_solution.connector_id); - } - } - - debug_assert!(self.committed_to.is_none()); - self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number)); - comp_ctx.submit_message(MessageContents::RequestCommit(full_solution)); - } else { - // Still have connectors to visit - comp_ctx.submit_message(MessageContents::Sync(partial_solution)); - } - } - - fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec) { - debug_assert!(parents.is_empty()); - - let mut next_branch_id = leaf_branch_id; - debug_assert!(next_branch_id.is_valid()); - - while next_branch_id.is_valid() { - parents.push(next_branch_id); - let branch = &self.branches[next_branch_id.index as usize]; - next_branch_id = branch.parent_index; - } - } -} - -#[derive(Eq, PartialEq)] -pub(crate) enum ConnectorScheduling { - Immediate, // Run again, immediately - Later, // Schedule for running, at some later point in time - NotNow, // Do not reschedule for running - Exit, // Connector has exited -} - -/// Recursively goes through the value group, attempting to find ports. -/// Duplicates will only be added once. -pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { - // Helper to check a value for a port and recurse if needed. - fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { - match value { - Value::Input(port_id) | Value::Output(port_id) => { - // This is an actual port - let cur_port = PortIdLocal::new(port_id.0.u32_suffix); - for prev_port in ports.iter() { - if *prev_port == cur_port { - // Already added - return; - } - } - - ports.push(cur_port); - }, - Value::Array(heap_pos) | - Value::Message(heap_pos) | - Value::String(heap_pos) | - Value::Struct(heap_pos) | - Value::Union(_, heap_pos) => { - // Reference to some dynamic thing which might contain ports, - // so recurse - let heap_region = &group.regions[*heap_pos as usize]; - for embedded_value in heap_region { - find_port_in_value(group, embedded_value, ports); - } - }, - _ => {}, // values we don't care about - } - } - - // Clear the ports, then scan all the available values - ports.clear(); - for value in &value_group.values { - find_port_in_value(value_group, value, ports); + ctx.notify_sync_end(&[]); } } \ No newline at end of file