diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 80a2a191b1a8f0a8eaed8f68b46fd08309d1bd2d..1b296960bcfabfc69b0176b84cd9b13b012e6bd3 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -3,7 +3,8 @@ use std::ops::{Index, IndexMut}; use crate::protocol::ComponentState; use crate::protocol::eval::{Value, ValueGroup}; -use crate::runtime2::port::PortIdLocal; + +use super::port::PortIdLocal; /// Generic branch ID. A component will always have one branch: the /// non-speculative branch. This branch has ID 0. Hence in a speculative context diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index e72928313e678585399b6e2ae593b7d25a7d14dd..f60d038c8d9816830c301122a06d6ef70c5896dd 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -1,315 +1,46 @@ +// connector.rs +// +// Represents a component. A component (and the scheduler that is running it) +// has many properties that are not easy to subdivide into aspects that are +// conceptually handled by particular data structures. That is to say: the code +// that we run governs: running PDL code, keeping track of ports, instantiating +// new components and transports (i.e. interacting with the runtime), running +// a consensus algorithm, etc. But on the other hand, our data is rather +// simple: we have a speculative execution tree, a set of ports that we own, +// and a bit of code that we should run. +// +// So currently the code is organized as following: +// - The scheduler that is running the component is the authoritative source on +// ports during *non-sync* mode. The consensus algorithm is the +// authoritative source during *sync* mode. They retrieve each other's +// state during the transitions. Hence port data exists duplicated between +// these two datastructures. +// - The execution tree is where executed branches reside. But the execution +// tree is only aware of the tree shape itself (and keeps track of some +// queues of branches that are in a particular state), and tends to store +// the PDL program state. The consensus algorithm is also somewhat aware +// of the execution tree, but only in terms of what is needed to complete +// a sync round (for now, that means the port mapping in each branch). +// Hence once more we have properties conceptually associated with branches +// in two places. +// - TODO: Write about handling messages, consensus wrapping data +// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx + use std::collections::HashMap; use std::sync::atomic::AtomicBool; use crate::PortId; -use crate::protocol::{ComponentState, RunContext, RunResult}; +use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; +use crate::protocol::{RunContext, RunResult}; -use super::ConnectorId; +use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; +use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; +use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox}; use super::native::Connector; -use super::scheduler::{ - SchedulerCtx, ComponentCtxFancy, ComponentPortChange, - ReceivedMessage -}; -use super::inbox::{ - PublicInbox, - DataMessage, SyncMessage, SolutionMessage, MessageContents, - SyncBranchConstraint, SyncConnectorSolution -}; use super::port::{PortKind, PortIdLocal}; +use super::scheduler::{ComponentCtx, SchedulerCtx}; -/// Represents the identifier of a branch (the index within its container). An -/// ID of `0` generally means "no branch" (e.g. no parent, or a port did not -/// yet receive anything from any branch). -// TODO: Remove Debug derive -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct BranchId { - pub index: u32, -} - -impl BranchId { - fn new_invalid() -> Self { - Self{ index: 0 } - } - - fn new(index: u32) -> Self { - debug_assert!(index != 0); - Self{ index } - } - - #[inline] - pub(crate) fn is_valid(&self) -> bool { - return self.index != 0; - } -} - -#[derive(Debug, PartialEq, Eq)] -pub(crate) enum SpeculativeState { - // Non-synchronous variants - RunningNonSync, // regular execution of code - Error, // encountered a runtime error - Finished, // finished executing connector's code - // Synchronous variants - RunningInSync, // running within a sync block - HaltedAtBranchPoint, // at a branching point (at a `get` call) - ReachedSyncEnd, // reached end of sync block, branch represents a local solution - Inconsistent, // branch can never represent a local solution, so halted -} - -pub(crate) struct Branch { - index: BranchId, - parent_index: BranchId, - // Code execution state - code_state: ComponentState, - prepared_channel: Option<(Value, Value)>, - sync_state: SpeculativeState, - halted_at_port: PortIdLocal, // invalid if not halted - next_branch_in_queue: Option, - // Message/port state - received: HashMap, // TODO: @temporary, remove together with fires() - ports_delta: Vec, -} - -impl Branch { - /// Constructs a non-sync branch. It is assumed that the code is at the - /// first instruction - pub(crate) fn new_initial_branch(component_state: ComponentState) -> Self { - Branch{ - index: BranchId::new_invalid(), - parent_index: BranchId::new_invalid(), - code_state: component_state, - prepared_channel: None, - sync_state: SpeculativeState::RunningNonSync, - halted_at_port: PortIdLocal::new_invalid(), - next_branch_in_queue: None, - received: HashMap::new(), - ports_delta: Vec::new(), - } - } - - /// Constructs a sync branch. The provided branch is assumed to be the - /// parent of the new branch within the execution tree. - fn new_sync_branching_from(new_index: u32, parent_branch: &Branch) -> Self { - debug_assert!( - (parent_branch.sync_state == SpeculativeState::RunningNonSync && !parent_branch.parent_index.is_valid()) || - (parent_branch.sync_state == SpeculativeState::HaltedAtBranchPoint) - ); - debug_assert!(parent_branch.prepared_channel.is_none()); - - Branch{ - index: BranchId::new(new_index), - parent_index: parent_branch.index, - code_state: parent_branch.code_state.clone(), - prepared_channel: None, - sync_state: SpeculativeState::RunningInSync, - halted_at_port: PortIdLocal::new_invalid(), - next_branch_in_queue: None, - received: parent_branch.received.clone(), - ports_delta: parent_branch.ports_delta.clone(), - } - } - - fn commit_to_sync(&mut self) { - // Logically impossible conditions (because we have a finished branch - // we are going to commit to) - debug_assert!(self.prepared_channel.is_none()); - debug_assert!(!self.halted_at_port.is_valid()); - - // Reset other variables to their defaults - self.index = BranchId::new_invalid(); - self.parent_index = BranchId::new_invalid(); - self.sync_state = SpeculativeState::RunningNonSync; - self.next_branch_in_queue = None; - self.received.clear(); - self.ports_delta.clear(); - } -} - -#[derive(Clone)] -struct PortAssignment { - is_assigned: bool, - last_registered_branch_id: BranchId, // invalid branch ID implies not assigned yet - num_times_fired: u32, -} - -impl PortAssignment { - fn new_unassigned() -> Self { - Self{ - is_assigned: false, - last_registered_branch_id: BranchId::new_invalid(), - num_times_fired: 0, - } - } - - #[inline] - fn mark_speculative(&mut self, num_times_fired: u32) { - debug_assert!(!self.last_registered_branch_id.is_valid()); - self.is_assigned = true; - self.num_times_fired = num_times_fired; - } - - #[inline] - fn mark_definitive(&mut self, branch_id: BranchId, num_times_fired: u32) { - self.is_assigned = true; - self.last_registered_branch_id = branch_id; - self.num_times_fired = num_times_fired; - } -} - -#[derive(Debug)] -enum PortOwnershipError { - UsedInInteraction(PortIdLocal), - AlreadyGivenAway(PortIdLocal) -} - -/// Contains a description of the port mapping during a particular sync session. -/// TODO: Extend documentation -pub(crate) struct ConnectorPorts { - // Essentially a mapping from `port_index` to `port_id`. - pub owned_ports: Vec, - // Contains P*B entries, where P is the number of ports and B is the number - // of branches. One can find the appropriate mapping of port p at branch b - // at linear index `b*P+p`. - port_mapping: Vec -} - -impl ConnectorPorts { - /// Constructs the initial ports object. Assumes the presence of the - /// non-sync branch at index 0. Will initialize all entries for the non-sync - /// branch. - fn new(owned_ports: Vec) -> Self { - let num_ports = owned_ports.len(); - let mut port_mapping = Vec::with_capacity(num_ports); - for _ in 0..num_ports { - port_mapping.push(PortAssignment::new_unassigned()); - } - - Self{ owned_ports, port_mapping } - } - - /// Prepares the port mapping for a new branch. Assumes that there is no - /// intermediate branch index that we have skipped. - fn prepare_sync_branch(&mut self, parent_branch_idx: u32, new_branch_idx: u32) { - let num_ports = self.owned_ports.len(); - let parent_base_idx = parent_branch_idx as usize * num_ports; - let new_base_idx = new_branch_idx as usize * num_ports; - - debug_assert!(parent_branch_idx < new_branch_idx); - debug_assert!(new_base_idx == self.port_mapping.len()); - - self.port_mapping.reserve(num_ports); - for offset in 0..num_ports { - let parent_port = &self.port_mapping[parent_base_idx + offset]; - let parent_port = parent_port.clone(); - self.port_mapping.push(parent_port); - } - } - - /// Adds a new port. Caller must make sure that the connector is not in the - /// sync phase. - fn add_port(&mut self, port_id: PortIdLocal) { - debug_assert!(self.port_mapping.len() == self.owned_ports.len()); - debug_assert!(!self.owned_ports.contains(&port_id)); - self.owned_ports.push(port_id); - self.port_mapping.push(PortAssignment::new_unassigned()); - } - - /// Commits to a particular branch. Essentially just removes the port - /// mapping information generated during the sync phase. - fn commit_to_sync(&mut self) { - self.port_mapping.truncate(self.owned_ports.len()); - debug_assert!(self.port_mapping.iter().all(|v| { - !v.is_assigned && !v.last_registered_branch_id.is_valid() - })); - } - - /// Removes a particular port from the connector. May only be done if the - /// connector is in non-sync mode - fn remove_port(&mut self, port_id: PortIdLocal) { - debug_assert!(self.port_mapping.len() == self.owned_ports.len()); // in non-sync mode - let port_index = self.get_port_index(port_id).unwrap(); - self.owned_ports.remove(port_index); - self.port_mapping.remove(port_index); - } - - /// Retrieves the index associated with a port id. Note that the port might - /// not exist (yet) if a speculative branch has just received the port. - /// TODO: But then again, one cannot use that port, right? - #[inline] - fn get_port_index(&self, port_id: PortIdLocal) -> Option { - for (idx, port) in self.owned_ports.iter().enumerate() { - if *port == port_id { - return Some(idx) - } - } - - return None - } - - /// Retrieves the ID associated with the port at the provided index - #[inline] - fn get_port_id(&self, port_index: usize) -> PortIdLocal { - return self.owned_ports[port_index]; - } - - #[inline] - fn get_port(&self, branch_idx: u32, port_idx: usize) -> &PortAssignment { - let mapped_idx = self.mapped_index(branch_idx, port_idx); - return &self.port_mapping[mapped_idx]; - } - - #[inline] - fn get_port_mut(&mut self, branch_idx: u32, port_idx: usize) -> &mut PortAssignment { - let mapped_idx = self.mapped_index(branch_idx, port_idx); - return &mut self.port_mapping[mapped_idx]; - } - - #[inline] - fn num_ports(&self) -> usize { - return self.owned_ports.len(); - } - - - // Function for internal use: retrieve index in flattened port mapping array - // based on branch/port index. - #[inline] - fn mapped_index(&self, branch_idx: u32, port_idx: usize) -> usize { - let branch_idx = branch_idx as usize; - let num_ports = self.owned_ports.len(); - - debug_assert!(port_idx < num_ports); - debug_assert!((branch_idx + 1) * num_ports <= self.port_mapping.len()); - - return branch_idx * num_ports + port_idx; - } -} - -struct BranchQueue { - first: u32, - last: u32, -} - -impl BranchQueue { - #[inline] - fn new() -> Self { - Self{ first: 0, last: 0 } - } - - #[inline] - fn is_empty(&self) -> bool { - debug_assert!((self.first == 0) == (self.last == 0)); - return self.first == 0; - } - - #[inline] - fn clear(&mut self) { - self.first = 0; - self.last = 0; - } -} - -/// Public fields of the connector that can be freely shared between multiple -/// threads. pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, pub sleeping: AtomicBool, @@ -324,65 +55,46 @@ impl ConnectorPublic { } } -// TODO: Maybe prevent false sharing by aligning `public` to next cache line. -// TODO: Do this outside of the connector, create a wrapping struct +#[derive(Eq, PartialEq)] +pub(crate) enum ConnectorScheduling { + Immediate, // Run again, immediately + Later, // Schedule for running, at some later point in time + NotNow, // Do not reschedule for running + Exit, // Connector has exited +} + pub(crate) struct ConnectorPDL { - // Branch management - branches: Vec, // first branch is always non-speculative one - sync_active: BranchQueue, - sync_pending_get: BranchQueue, - sync_finished: BranchQueue, - sync_finished_last_handled: u32, // TODO: Change to BranchId? - cur_round: u32, - // Port/message management - pub committed_to: Option<(ConnectorId, u64)>, - pub ports: ConnectorPorts, + tree: ExecTree, + consensus: Consensus, } -// TODO: Remove this monstrosity struct ConnectorRunContext<'a> { - branch_index: u32, - ports: &'a ConnectorPorts, - ports_delta: &'a Vec, - received: &'a HashMap, + branch_id: BranchId, + consensus: &'a Consensus, + received: &'a HashMap, scheduler: SchedulerCtx<'a>, prepared_channel: Option<(Value, Value)>, } -impl<'a> RunContext for ConnectorRunContext<'a> { +impl<'a> RunContext for ConnectorRunContext<'a>{ fn did_put(&mut self, port: PortId) -> bool { - if self.ports_delta.iter().any(|v| v.port.self_id.index == port.0.u32_suffix) { - // Either acquired or released, must be silent - return false; - } - - let port_index = self.ports.get_port_index(PortIdLocal::new(port.0.u32_suffix)).unwrap(); - let mapping = self.ports.get_port(self.branch_index, port_index); - return mapping.is_assigned; + let port_id = PortIdLocal::new(port.0.u32_suffix); + let annotation = self.consensus.get_annotation(self.branch_id, port_id); + return annotation.registered_id.is_some(); } fn get(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); match self.received.get(&port_id) { - Some(message) => Some(message.message.clone()), + Some(data) => Some(data.clone()), None => None, } } fn fires(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); - if self.ports_delta.iter().any(|v| v.port.self_id == port_id) { - return None - } - - let port_index = self.ports.get_port_index(port_id).unwrap(); - let mapping = self.ports.get_port(self.branch_index, port_index); - - if mapping.is_assigned { - return Some(Value::Bool(mapping.num_times_fired != 0)); - } else { - return None; - } + let annotation = self.consensus.get_annotation(self.branch_id, port_id); + return annotation.expected_firing.map(|v| Value::Bool(v)); } fn get_channel(&mut self) -> Option<(Value, Value)> { @@ -391,57 +103,16 @@ impl<'a> RunContext for ConnectorRunContext<'a> { } impl Connector for ConnectorPDL { - fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); - if comp_ctx.is_in_sync() { - let scheduling = self.run_in_speculative_mode(sched_ctx, comp_ctx); - - // When in speculative mode we might have generated new sync - // solutions, we need to turn them into proposed solutions here. - if self.sync_finished_last_handled != self.sync_finished.last { - // Retrieve first element in queue - let mut next_id; - if self.sync_finished_last_handled == 0 { - next_id = self.sync_finished.first; - } else { - let last_handled = &self.branches[self.sync_finished_last_handled as usize]; - debug_assert!(last_handled.next_branch_in_queue.is_some()); // because "last handled" != "last in queue" - next_id = last_handled.next_branch_in_queue.unwrap(); - } - - loop { - let branch_id = BranchId::new(next_id); - let branch = &self.branches[next_id as usize]; - let branch_next = branch.next_branch_in_queue; - - // Turn local solution into a message and send it along - // TODO: Like `ports` access, also revise the construction of this `key`, should not be needed - let solution_message = self.generate_initial_solution_for_branch(branch_id, comp_ctx); - if let Some(valid_solution) = solution_message { - self.submit_sync_solution(valid_solution, comp_ctx); - } else { - // Branch is actually invalid, but we only just figured - // it out. We need to mark it as invalid to prevent - // future use - Self::remove_branch_from_queue(&mut self.branches, &mut self.sync_finished, branch_id); - if branch_id.index == self.sync_finished_last_handled { - self.sync_finished_last_handled = self.sync_finished.last; - } - - let branch = &mut self.branches[next_id as usize]; - branch.sync_state = SpeculativeState::Inconsistent; - } - - match branch_next { - Some(id) => next_id = id, - None => break, - } - } - - self.sync_finished_last_handled = next_id; + if self.tree.is_in_sync() { + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { + self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } else { + return scheduling } - - return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; @@ -450,612 +121,241 @@ impl Connector for ConnectorPDL { } impl ConnectorPDL { - /// Constructs a representation of a connector. The assumption is that the - /// initial branch is at the first instruction of the connector's code, - /// hence is in a non-sync state. - pub fn new(initial_branch: Branch, owned_ports: Vec) -> Self { + pub fn new(initial: ComponentState) -> Self { Self{ - branches: vec![initial_branch], - sync_active: BranchQueue::new(), - sync_pending_get: BranchQueue::new(), - sync_finished: BranchQueue::new(), - sync_finished_last_handled: 0, // none at all - cur_round: 0, - committed_to: None, - ports: ConnectorPorts::new(owned_ports), + tree: ExecTree::new(initial), + consensus: Consensus::new(), } } - // ------------------------------------------------------------------------- - // Handling connector messages - // ------------------------------------------------------------------------- + // --- Handling messages - pub fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtxFancy) { - while let Some(message) = comp_ctx.read_next_message() { + pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) { + while let Some(message) = ctx.read_next_message() { match message { - ReceivedMessage::Data((target_port_id, contents)) => { - self.handle_data_message(target_port_id, &contents); - }, - ReceivedMessage::Sync(contents) => { - self.handle_sync_message(contents, comp_ctx); - }, - ReceivedMessage::RequestCommit(contents) => { - self.handle_request_commit_message(contents, comp_ctx); - }, - ReceivedMessage::ConfirmCommit(contents) => { - self.handle_confirm_commit_message(contents, comp_ctx); - }, - } - } - } - - pub fn handle_data_message(&mut self, target_port_id: PortIdLocal, message: &DataMessage) { - // Go through all branches that are waiting for a message - let mut branch_idx = self.sync_pending_get.first; - while branch_idx != 0 { - let branch = &self.branches[branch_idx as usize]; - let next_branch_idx = branch.next_branch_in_queue.unwrap_or(0); - - let target_port_index = self.ports.get_port_index(target_port_id).unwrap(); - let port_mapping = self.ports.get_port(branch_idx, target_port_index); - - // Check if the branch may accept the message - if branch.sync_state == SpeculativeState::HaltedAtBranchPoint && - branch.halted_at_port == target_port_id && - port_mapping.last_registered_branch_id == message.sender_prev_branch_id - { - // Branch can accept. So fork it, and let the fork accept the - // message. The original branch stays waiting for new messages. - let new_branch_idx = self.branches.len() as u32; - let new_branch = Branch::new_sync_branching_from(new_branch_idx, branch); - - self.ports.prepare_sync_branch(branch_idx, new_branch_idx); - let mapping = self.ports.get_port_mut(branch_idx, target_port_index); - mapping.last_registered_branch_id = message.sender_cur_branch_id; - - let new_branch_id = BranchId::new(new_branch_idx); - self.branches.push(new_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id) + Message::Data(message) => self.handle_new_data_message(message, ctx), + Message::Sync(message) => self.handle_new_sync_message(message, ctx), + Message::Control(_) => unreachable!("control message in component"), } - - branch_idx = next_branch_idx; } } - /// Accepts a synchronous message and combines it with the locally stored - /// solution(s). Then queue new `Sync`/`Solution` messages when appropriate. - pub fn handle_sync_message(&mut self, message: SyncMessage, comp_ctx: &mut ComponentCtxFancy) { - debug_assert!(!message.to_visit.contains(&comp_ctx.id)); // own ID already removed - debug_assert!(message.constraints.iter().any(|v| v.connector_id == comp_ctx.id)); // we have constraints - - // TODO: Optimize, use some kind of temp workspace vector - let mut execution_path_branch_ids = Vec::new(); - - if self.sync_finished_last_handled != 0 { - // We have some solutions to match against - let constraints_index = message.constraints - .iter() - .position(|v| v.connector_id == comp_ctx.id) - .unwrap(); - let constraints = &message.constraints[constraints_index].constraints; - debug_assert!(!constraints.is_empty()); - - // Note that we only iterate over the solutions we've already - // handled ourselves, not necessarily - let mut branch_index = self.sync_finished.first; - 'branch_loop: loop { - // Load solution branch - let branch = &self.branches[branch_index as usize]; - execution_path_branch_ids.clear(); - self.branch_ids_of_execution_path(BranchId::new(branch_index), &mut execution_path_branch_ids); - - // Check if the branch matches all of the applied constraints - for constraint in constraints { - match constraint { - SyncBranchConstraint::SilentPort(silent_port_id) => { - let port_index = self.ports.get_port_index(*silent_port_id); - if port_index.is_none() { - // Nefarious peer - continue 'branch_loop; - } - let port_index = port_index.unwrap(); - - let mapping = self.ports.get_port(branch_index, port_index); - debug_assert!(mapping.is_assigned); - - if mapping.num_times_fired != 0 { - // Not silent, constraint not satisfied - continue 'branch_loop; - } - }, - SyncBranchConstraint::BranchNumber(expected_branch_id) => { - if !execution_path_branch_ids.contains(expected_branch_id) { - // Not the expected execution path, constraint not satisfied - continue 'branch_loop; - } - }, - SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { - let port_index = self.ports.get_port_index(*port_id); - if port_index.is_none() { - // Nefarious peer - continue 'branch_loop; - } - let port_index = port_index.unwrap(); - - let mapping = self.ports.get_port(branch_index, port_index); - if mapping.last_registered_branch_id != *expected_branch_id { - // Not the expected interaction on this port, constraint not satisfied - continue 'branch_loop; - } - }, - } - } - - // If here, then all of the external constraints were satisfied - // for the current branch. But the branch itself also imposes - // constraints. So while building up the new solution, make sure - // that those are satisfied as well. - // TODO: Code below can probably be merged with initial solution - // generation. + pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + // Go through all branches that are awaiting new messages and see if + // there is one that can receive this message. + debug_assert!(ctx.workspace_branches.is_empty()); + let mut branches = Vec::new(); // TODO: @Remove + self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches); - // - clone old solution so we can add to it - let mut new_solution = message.clone(); - - // - determine the initial port mapping - let num_ports = self.ports.num_ports(); - let mut new_solution_mapping = Vec::with_capacity(num_ports); - for port_index in 0..self.ports.num_ports() { - let port_id = self.ports.get_port_id(port_index); - let mapping = self.ports.get_port(branch_index, port_index); - new_solution_mapping.push((port_id, mapping.last_registered_branch_id)); - } - - // - replace constraints with a local solution - new_solution.constraints.remove(constraints_index); - new_solution.local_solutions.push(SyncConnectorSolution{ - connector_id: comp_ctx.id, - terminating_branch_id: BranchId::new(branch_index), - execution_branch_ids: execution_path_branch_ids.clone(), - final_port_mapping: new_solution_mapping, - }); - - // - do a second pass on the ports to generate and add the - // constraints that should be applied to other connectors - for port_index in 0..self.ports.num_ports() { - let port_id = self.ports.get_port_id(port_index); - - let (peer_connector_id, peer_port_id, peer_is_getter) = { - let port = comp_ctx.get_port_by_id(port_id).unwrap(); - (port.peer_connector, port.peer_id, port.kind == PortKind::Putter) - }; - - let mapping = self.ports.get_port(branch_index, port_index); - let constraint = if mapping.num_times_fired == 0 { - SyncBranchConstraint::SilentPort(peer_port_id) - } else { - if peer_is_getter { - SyncBranchConstraint::PortMapping(peer_port_id, mapping.last_registered_branch_id) - } else { - SyncBranchConstraint::BranchNumber(mapping.last_registered_branch_id) - } - }; - - match new_solution.add_or_check_constraint(peer_connector_id, constraint) { - Err(_) => continue 'branch_loop, - Ok(false) => continue 'branch_loop, - Ok(true) => {}, - } - } + for branch_id in branches.drain(..) { + // This branch can receive, so fork and given it the message + let receiving_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + let receiving_branch = &mut self.tree[receiving_branch_id]; - // If here, then the newly generated solution is completely - // compatible. - let next_branch = branch.next_branch_in_queue; - self.submit_sync_solution(new_solution, comp_ctx); + receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); - // Consider the next branch - if branch_index == self.sync_finished_last_handled { - // At the end of the previously handled solutions - break; - } - - debug_assert!(next_branch.is_some()); // because we cannot be at the end of the queue - branch_index = next_branch.unwrap(); - } + // And prepare the branch for running + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } - fn handle_request_commit_message(&mut self, mut message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) { - let should_propagate_message = match &self.committed_to { - Some((previous_origin, previous_comparison)) => { - // Already committed to something. So will commit to this if it - // takes precedence over the current solution - message.comparison_number > *previous_comparison || - (message.comparison_number == *previous_comparison && message.connector_origin.0 > previous_origin.0) - }, - None => { - // Not yet committed to a solution, so commit to this one - true - } - }; - - if should_propagate_message { - self.committed_to = Some((message.connector_origin, message.comparison_number)); - - if message.to_visit.is_empty() { - // Visited all of the connectors, so every connector can now - // apply the solution - // TODO: Use temporary workspace - let mut to_visit = Vec::with_capacity(message.local_solutions.len() - 1); - for (connector_id, _) in &message.local_solutions { - if *connector_id != comp_ctx.id { - to_visit.push(*connector_id); - } - } - - message.to_visit = to_visit; - comp_ctx.submit_message(MessageContents::ConfirmCommit(message.clone())); - self.handle_confirm_commit_message(message, comp_ctx); - } else { - // Not yet visited all of the connectors - comp_ctx.submit_message(MessageContents::RequestCommit(message)); - } + pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { + if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { + self.collapse_sync_to_solution_branch(solution_branch_id, ctx); } } - fn handle_confirm_commit_message(&mut self, message: SolutionMessage, comp_ctx: &mut ComponentCtxFancy) { - // Make sure this is the message we actually committed to. As long as - // we're running on a single machine this is fine. - // TODO: Take care of nefarious peers - let (expected_connector_id, expected_comparison_number) = - self.committed_to.unwrap(); - assert_eq!(message.connector_origin, expected_connector_id); - assert_eq!(message.comparison_number, expected_comparison_number); - - // Find the branch we're supposed to commit to - let (_, branch_id) = message.local_solutions - .iter() - .find(|(id, _)| *id == comp_ctx.id) - .unwrap(); - let branch_id = *branch_id; - - // Commit to the branch. That is: move the solution branch to the first - // of the connector's branches - self.branches.swap(0, branch_id.index as usize); - self.branches.truncate(1); // TODO: Or drain and do not deallocate? - let solution_branch = &mut self.branches[0]; + // --- Running code - // Clear all of the other sync-related variables - self.sync_active.clear(); - self.sync_pending_get.clear(); - self.sync_finished.clear(); - self.sync_finished_last_handled = 0; - self.cur_round += 1; - - self.committed_to = None; - self.ports.commit_to_sync(); - - // Add/remove any of the ports we lost during the sync phase - // TODO: Probably might not need this with the port syncing - for port_delta in &solution_branch.ports_delta { - if port_delta.is_acquired { - self.ports.add_port(port_delta.port.self_id); - } else { - self.ports.remove_port(port_delta.port.self_id); - } - } - - comp_ctx.notify_sync_end(&solution_branch.ports_delta); - solution_branch.commit_to_sync(); - } - - // ------------------------------------------------------------------------- - // Executing connector code - // ------------------------------------------------------------------------- - - /// Runs the connector in synchronous mode. Potential changes to the global - /// system's state are added to the `RunDeltaState` object by the connector, - /// where it is the caller's responsibility to immediately take care of - /// those changes. The return value indicates when (and if) the connector - /// needs to be scheduled again. - pub fn run_in_speculative_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - debug_assert!(comp_ctx.is_in_sync()); - - if self.sync_active.is_empty() { + pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + // Check if we have any branch that needs running + debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); + let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); + if branch_id.is_none() { return ConnectorScheduling::NotNow; } - let branch = Self::pop_branch_from_queue(&mut self.branches, &mut self.sync_active); + // Retrieve the branch and run it + let branch_id = branch_id.unwrap(); + let branch = &mut self.tree[branch_id]; - // Run the branch to the next blocking point - debug_assert!(branch.prepared_channel.is_none()); - let mut run_context = ConnectorRunContext { - branch_index: branch.index.index, - ports: &self.ports, - ports_delta: &branch.ports_delta, + let mut run_context = ConnectorRunContext{ + branch_id, + consensus: &self.consensus, + received: &branch.inbox, scheduler: sched_ctx, - prepared_channel: None, - received: &branch.received, + prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); - // Match statement contains `return` statements only if the particular - // run result behind handled requires an immediate re-run of the - // connector. + // Handle the returned result. Note that this match statement contains + // explicit returns in case the run result requires that the component's + // code is ran again immediately match run_result { RunResult::BranchInconsistent => { - // Speculative branch became inconsistent + // Branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, RunResult::BranchMissingPortState(port_id) => { - // Branch called `fires()` on a port that does not yet have an - // assigned speculative value. So we need to create those - // branches - let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); - let local_port_index = self.ports.get_port_index(local_port_id).unwrap(); - - debug_assert!(self.ports.owned_ports.contains(&local_port_id)); + // Branch called `fires()` on a port that has not been used yet. + let port_id = PortIdLocal::new(port_id.0.u32_suffix); - // Create two copied branches, one silent and one firing + // Create two forks, one that assumes the port will fire, and + // one that assumes the port remains silent branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - let parent_branch_id = branch.index; - let parent_branch = &self.branches[parent_branch_id.index as usize]; - - let silent_index = self.branches.len() as u32; - let firing_index = silent_index + 1; - - let silent_branch = Branch::new_sync_branching_from(silent_index, parent_branch); - self.ports.prepare_sync_branch(parent_branch.index.index, silent_index); - - let firing_branch = Branch::new_sync_branching_from(firing_index, parent_branch); - self.ports.prepare_sync_branch(parent_branch.index.index, firing_index); - // Assign the port values of the two new branches - let silent_port = self.ports.get_port_mut(silent_index, local_port_index); - silent_port.mark_speculative(0); + let firing_branch_id = self.tree.fork_branch(branch_id); + let silent_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, firing_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); + debug_assert_eq!(_result, Consistency::Valid); + self.consensus.notify_of_new_branch(branch_id, silent_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); + debug_assert_eq!(_result, Consistency::Valid); - let firing_port = self.ports.get_port_mut(firing_index, local_port_index); - firing_port.mark_speculative(1); - - // Run both branches again - let silent_branch_id = silent_branch.index; - self.branches.push(silent_branch); - let firing_branch_id = firing_branch.index; - self.branches.push(firing_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, silent_branch_id); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, firing_branch_id); + // Somewhat important: we push the firing one first, such that + // that branch is ran again immediately. + self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); + self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); return ConnectorScheduling::Immediate; }, RunResult::BranchMissingPortValue(port_id) => { - // Branch performed a `get` on a port that has not yet received - // a value in its inbox. - let local_port_id = PortIdLocal::new(port_id.0.u32_suffix); - let local_port_index = self.ports.get_port_index(local_port_id); - if local_port_index.is_none() { - todo!("deal with the case where the port is acquired"); - } - let local_port_index = local_port_index.unwrap(); - let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); - - // Check for port mapping assignment and, if present, if it is - // consistent - let is_valid_get = if port_mapping.is_assigned { - assert!(port_mapping.num_times_fired <= 1); // temporary, until we get rid of `fires` - port_mapping.num_times_fired == 1 - } else { - // Not yet assigned - port_mapping.mark_speculative(1); - true - }; - - if is_valid_get { - // Mark as a branching point for future messages + // Branch performed a `get()` on a port that does not have a + // received message on that port. + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + // `get()` is valid, so mark the branch as awaiting a message branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.halted_at_port = local_port_id; - let branch_id = branch.index; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_pending_get, branch_id); - - // But if some messages can be immediately applied, do so - // now. - let messages = comp_ctx.get_read_data_messages(local_port_id, port_mapping.last_registered_branch_id); - let mut did_have_messages = false; - - for message in messages { - did_have_messages = true; - - // For each message prepare a new branch to execute - let parent_branch = &self.branches[branch_id.index as usize]; - let new_branch_index = self.branches.len() as u32; - let mut new_branch = Branch::new_sync_branching_from(new_branch_index, parent_branch); - self.ports.prepare_sync_branch(branch_id.index, new_branch_index); - - let port_mapping = self.ports.get_port_mut(new_branch_index, local_port_index); - port_mapping.last_registered_branch_id = message.sender_cur_branch_id; - debug_assert!(port_mapping.is_assigned && port_mapping.num_times_fired == 1); - - new_branch.received.insert(local_port_id, message.clone()); - - // If the message contains any ports then they will now - // be owned by the new branch - let mut transferred_ports = Vec::new(); // TODO: Create workspace somewhere - find_ports_in_value_group(&message.message, &mut transferred_ports); - Self::acquire_ports_during_sync(&mut self.ports, &mut new_branch, &transferred_ports); - - // Schedule the new branch - debug_assert!(new_branch.sync_state == SpeculativeState::RunningInSync); - let new_branch_id = new_branch.index; - self.branches.push(new_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, new_branch_id); + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + // Note: we only know that a branch is waiting on a message when + // it reaches the `get` call. But we might have already received + // a message that targets this branch, so check now. + let mut any_branch_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + + branch.insert_message(port_id, message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_branch_received = true; + } } - if did_have_messages { - // If we did create any new branches, then we can run - // them immediately. + if any_branch_received { return ConnectorScheduling::Immediate; } } else { branch.sync_state = SpeculativeState::Inconsistent; } - }, + } RunResult::BranchAtSyncEnd => { - // Branch is done, go through all of the ports that are not yet - // assigned and map them to non-firing. - for port_idx in 0..self.ports.num_ports() { - let port_mapping = self.ports.get_port_mut(branch.index.index, port_idx); - if !port_mapping.is_assigned { - port_mapping.mark_speculative(0); - } + let consistency = self.consensus.notify_of_finished_branch(branch_id); + if consistency == Consistency::Valid { + branch.sync_state = SpeculativeState::ReachedSyncEnd; + self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); + } else if consistency == Consistency::Inconsistent { + branch.sync_state = SpeculativeState::Inconsistent; } - - let branch_id = branch.index; - branch.sync_state = SpeculativeState::ReachedSyncEnd; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_finished, branch_id); }, - RunResult::BranchPut(port_id, value_group) => { - // Branch performed a `put` on a particualar port. - let local_port_id = PortIdLocal{ index: port_id.0.u32_suffix }; - let local_port_index = self.ports.get_port_index(local_port_id); - if local_port_index.is_none() { - todo!("handle case where port was received before (i.e. in ports_delta)") - } - let local_port_index = local_port_index.unwrap(); - - // Check the port mapping for consistency - // TODO: For now we can only put once, so that simplifies stuff - let port_mapping = self.ports.get_port_mut(branch.index.index, local_port_index); - let is_valid_put = if port_mapping.is_assigned { - // Already assigned, so must be speculative and one time - // firing, otherwise we are `put`ing multiple times. - if port_mapping.last_registered_branch_id.is_valid() { - // Already did a `put` - todo!("handle error through RunDeltaState"); - } else { - // Valid if speculatively firing - port_mapping.num_times_fired == 1 - } - } else { - // Not yet assigned, do so now - true - }; - - if is_valid_put { - // Put in run results for thread to pick up and transfer to - // the correct connector inbox. - port_mapping.mark_definitive(branch.index, 1); - let message = DataMessage{ - sending_port: local_port_id, - sender_prev_branch_id: BranchId::new_invalid(), - sender_cur_branch_id: branch.index, - message: value_group, - }; - - // If the message contains any ports then we release our - // ownership over them in this branch - let mut transferred_ports = Vec::new(); // TODO: Put in some temp workspace - find_ports_in_value_group(&message.message, &mut transferred_ports); - Self::release_ports_during_sync(&mut self.ports, branch, &transferred_ports).unwrap(); - - comp_ctx.submit_message(MessageContents::Data(message)); - - let branch_index = branch.index; - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, branch_index); - return ConnectorScheduling::Immediate + RunResult::BranchPut(port_id, content) => { + // Branch is attempting to send data + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + // `put()` is valid. + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + comp_ctx.submit_message(Message::Data(DataMessage { + sync_header, data_header, + content: DataContent::Message(content), + })); + + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; } else { branch.sync_state = SpeculativeState::Inconsistent; } }, - _ => unreachable!("unexpected run result '{:?}' while running in sync mode", run_result), + _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } - // Not immediately scheduling, so schedule again if there are more - // branches to run - if self.sync_active.is_empty() { + // If here then the run result did not require a particular action. We + // return whether we have more active branches to run or not. + if self.tree.queue_is_empty(QueueKind::Runnable) { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; } } - /// Runs the connector in non-synchronous mode. - pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - debug_assert!(!comp_ctx.is_in_sync()); - debug_assert!(self.sync_active.is_empty() && self.sync_pending_get.is_empty() && self.sync_finished.is_empty()); - debug_assert!(self.branches.len() == 1); + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); - let branch = &mut self.branches[0]; + let branch = self.tree.base_branch_mut(); debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ - branch_index: branch.index.index, - ports: &self.ports, - ports_delta: &branch.ports_delta, + branch_id: branch.id, + consensus: &self.consensus, + received: &branch.inbox, scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), - received: &branch.received, }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); match run_result { RunResult::ComponentTerminated => { - // Need to wait until all children are terminated - // TODO: Think about how to do this? branch.sync_state = SpeculativeState::Finished; + return ConnectorScheduling::Exit; }, RunResult::ComponentAtSyncStart => { - // Prepare for sync execution and reschedule immediately - // TODO: Not sure about this. I want a clear synchronization - // point between scheduler/component view on the ports. But is - // this the way to do it? - let current_ports = comp_ctx.notify_sync_start(); - for port in current_ports { - debug_assert!(self.ports.get_port_index(port.self_id).is_some()); - } + comp_ctx.notify_sync_start(); + let sync_branch_id = self.tree.start_sync(); + self.consensus.start_sync(comp_ctx); + self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); + self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); - let first_sync_branch = Branch::new_sync_branching_from(1, branch); - let first_sync_branch_id = first_sync_branch.index; - self.ports.prepare_sync_branch(0, 1); - self.branches.push(first_sync_branch); - Self::push_branch_into_queue(&mut self.branches, &mut self.sync_active, first_sync_branch_id); - - return ConnectorScheduling::Later; + return ConnectorScheduling::Immediate; }, RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { - // Construction of a new component. Find all references to ports - // inside of the arguments - let mut transferred_ports = Vec::new(); - find_ports_in_value_group(&arguments, &mut transferred_ports); - - if !transferred_ports.is_empty() { - // Ports changing ownership - if let Err(_) = Self::release_ports_during_non_sync(&mut self.ports, branch, &transferred_ports) { - todo!("fatal error handling"); - } - } + // Note: we're relinquishing ownership of ports. But because + // we are in non-sync mode the scheduler will handle and check + // port ownership transfer. + debug_assert!(comp_ctx.workspace_ports.is_empty()); + find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); - // Add connector for later execution - let new_connector_state = ComponentState { + let new_state = ComponentState { prompt: Prompt::new( &sched_ctx.runtime.protocol_description.types, &sched_ctx.runtime.protocol_description.heap, definition_id, monomorph_idx, arguments - ) + ), }; - - let new_connector_branch = Branch::new_initial_branch(new_connector_state); - let new_connector = ConnectorPDL::new(new_connector_branch, transferred_ports); - - comp_ctx.push_component(new_connector); + let new_component = ConnectorPDL::new(new_state); + comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); + comp_ctx.workspace_ports.clear(); return ConnectorScheduling::Later; }, RunResult::NewChannel => { - // Need to prepare a new channel let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); - debug_assert_eq!(getter.kind, PortKind::Getter); + debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); branch.prepared_channel = Some(( Value::Input(PortId::new(putter.self_id.index)), - Value::Output(PortId::new(getter.self_id.index)) + Value::Output(PortId::new(getter.self_id.index)), )); comp_ctx.push_port(putter); @@ -1067,385 +367,16 @@ impl ConnectorPDL { } } - // ------------------------------------------------------------------------- - // Internal helpers - // ------------------------------------------------------------------------- - - // Helpers for management of the branches and their internally stored - // `next_branch_in_queue` and the `BranchQueue` objects. Essentially forming - // linked lists inside of the vector of branches. + pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) { + let mut fake_vec = Vec::new(); + self.tree.end_sync(solution_branch_id); + self.consensus.end_sync(solution_branch_id, &mut fake_vec); - /// Pops from front of linked-list branch queue. - fn pop_branch_from_queue<'a>(branches: &'a mut Vec, queue: &mut BranchQueue) -> &'a mut Branch { - debug_assert!(queue.first != 0); - let branch = &mut branches[queue.first as usize]; - queue.first = branch.next_branch_in_queue.unwrap_or(0); - branch.next_branch_in_queue = None; - - if queue.first == 0 { - // No more entries in queue - debug_assert_eq!(queue.last, branch.index.index); - queue.last = 0; + for port in fake_vec { + // TODO: Handle sent/received ports + debug_assert!(ctx.get_port_by_id(port).is_some()); } - return branch; - } - - /// Pushes branch at the end of the linked-list branch queue. - fn push_branch_into_queue( - branches: &mut Vec, queue: &mut BranchQueue, to_push: BranchId, - ) { - debug_assert!(to_push.is_valid()); - let to_push = to_push.index; - - if queue.last == 0 { - // No branches in the queue at all - debug_assert_eq!(queue.first, 0); - branches[to_push as usize].next_branch_in_queue = None; - queue.first = to_push; - queue.last = to_push; - } else { - // Pre-existing branch in the queue - debug_assert_ne!(queue.first, 0); - branches[queue.last as usize].next_branch_in_queue = Some(to_push); - queue.last = to_push; - } - } - - /// Removes branch from linked-list queue. Walks through the entire list to - /// find the element (!). Assumption is that this is not called often. - fn remove_branch_from_queue( - branches: &mut Vec, queue: &mut BranchQueue, to_delete: BranchId, - ) { - debug_assert!(to_delete.is_valid()); // we're deleting a valid item - debug_assert!(queue.first != 0 && queue.last != 0); // queue isn't empty to begin with - - // Retrieve branch and its next element - let branch_to_delete = &mut branches[to_delete.index as usize]; - let branch_next_index_option = branch_to_delete.next_branch_in_queue; - let branch_next_index_unwrapped = branch_next_index_option.unwrap_or(0); - branch_to_delete.next_branch_in_queue = None; - - // Walk through all elements in queue to find branch to delete - let mut prev_index = 0; - let mut next_index = queue.first; - - while next_index != 0 { - if next_index == to_delete.index { - // Found the element we're going to delete - // - check if at the first element or not - if prev_index == 0 { - queue.first = branch_next_index_unwrapped; - } else { - let prev_branch = &mut branches[prev_index as usize]; - prev_branch.next_branch_in_queue = branch_next_index_option; - } - - // - check if at last element or not (also takes care of "no elements left in queue") - if branch_next_index_option.is_none() { - queue.last = prev_index; - } - - return; - } - - prev_index = next_index; - let entry = &branches[next_index as usize]; - next_index = entry.next_branch_in_queue.unwrap_or(0); - } - - // If here, then we didn't find the element - panic!("branch does not exist in provided queue"); - } - - // Helpers for local port management. Specifically for adopting/losing - // ownership over ports, and for checking if specific ports can be sent - // over another port. - - /// Releasing ownership of ports while in non-sync mode. This only occurs - /// while instantiating new connectors - fn release_ports_during_non_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { - debug_assert!(!branch.index.is_valid()); // branch in non-sync mode - - for port_id in port_ids { - // We must own the port, or something is wrong with our code - todo!("Set up some kind of message router"); - debug_assert!(ports.get_port_index(*port_id).is_some()); - ports.remove_port(*port_id); - } - - return Ok(()) - } - - /// Releasing ownership of ports during a sync-session. Will provide an - /// error if the port was already used during a sync block. - fn release_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { - if port_ids.is_empty() { - return Ok(()) - } - - todo!("unfinished: add port properties during final solution-commit msgs"); - debug_assert!(branch.index.is_valid()); // branch in sync mode - - for port_id in port_ids { - match ports.get_port_index(*port_id) { - Some(port_index) => { - // We (used to) own the port. Make sure it is not given away - // already and not used to put/get data. - let port_mapping = ports.get_port(branch.index.index, port_index); - if port_mapping.is_assigned && port_mapping.num_times_fired != 0 { - // Already used - return Err(PortOwnershipError::UsedInInteraction(*port_id)); - } - - for delta in &branch.ports_delta { - if delta.port.self_id == *port_id { - // We cannot have acquired this port, because the - // call to `ports.get_port_index` returned an index. - debug_assert!(!delta.is_acquired); - return Err(PortOwnershipError::AlreadyGivenAway(*port_id)); - } - } - - // TODO: Obtain port description - // branch.ports_delta.push(ComponentPortChange{ - // is_acquired: false, - // port_id: *port_id, - // }); - }, - None => { - // Not in port mapping, so we must have acquired it before, - // remove the acquirement. - let mut to_delete_index: isize = -1; - for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { - if delta.port.self_id == *port_id { - debug_assert!(delta.is_acquired); - to_delete_index = delta_idx as isize; - break; - } - } - - debug_assert!(to_delete_index != -1); - branch.ports_delta.remove(to_delete_index as usize); - } - } - } - - return Ok(()) - } - - /// Acquiring ports during a sync-session. - fn acquire_ports_during_sync(ports: &mut ConnectorPorts, branch: &mut Branch, port_ids: &[PortIdLocal]) -> Result<(), PortOwnershipError> { - if port_ids.is_empty() { - return Ok(()) - } - - todo!("unfinished: add port properties during final solution-commit msgs"); - debug_assert!(branch.index.is_valid()); // branch in sync mode - - 'port_loop: for port_id in port_ids { - for (delta_idx, delta) in branch.ports_delta.iter().enumerate() { - if delta.port.self_id == *port_id { - if delta.is_acquired { - // Somehow already received this port. - // TODO: @security - todo!("take care of nefarious peers"); - } else { - // Sending ports to ourselves - debug_assert!(ports.get_port_index(*port_id).is_some()); - branch.ports_delta.remove(delta_idx); - continue 'port_loop; - } - } - } - - // If here then we can safely acquire the new port - // TODO: Retrieve port infor - // branch.ports_delta.push(PortOwnershipDelta{ - // acquired: true, - // port_id: *port_id, - // }); - } - - return Ok(()) - } - - // Helpers for generating and handling sync messages (and the solutions that - // are described by those sync messages) - - /// Generates the initial solution for a finished sync branch. If initial - /// local solution is valid, then the appropriate message is returned. - /// Otherwise the initial solution is inconsistent. - fn generate_initial_solution_for_branch(&self, branch_id: BranchId, comp_ctx: &ComponentCtxFancy) -> Option { - // Retrieve branchh - debug_assert!(branch_id.is_valid()); // because we're supposed to be in sync mode - let branch = &self.branches[branch_id.index as usize]; - debug_assert_eq!(branch.sync_state, SpeculativeState::ReachedSyncEnd); - - // Set up storage (this is also the storage for all of the connectors - // that will be visited, hence the initial size approximation) - let mut all_branch_ids = Vec::new(); - self.branch_ids_of_execution_path(branch_id, &mut all_branch_ids); - - let num_ports = self.ports.num_ports(); - let approximate_peers = num_ports; - let mut initial_solution_port_mapping = Vec::with_capacity(num_ports); - for port_idx in 0..self.ports.num_ports() { - let port_id = self.ports.get_port_id(port_idx); - let port_desc = self.ports.get_port(branch_id.index, port_idx); - - // Note: if assigned then we expect a valid branch ID. Otherwise we have the "invalid - // branch" as ID, marking that we want it to be silent - debug_assert!(port_desc.is_assigned == port_desc.last_registered_branch_id.is_valid()); - initial_solution_port_mapping.push((port_id, port_desc.last_registered_branch_id)); - } - - let initial_local_solution = SyncConnectorSolution{ - connector_id: comp_ctx.id, - terminating_branch_id: branch_id, - execution_branch_ids: all_branch_ids, - final_port_mapping: initial_solution_port_mapping, - }; - - let mut sync_message = SyncMessage::new(initial_local_solution, approximate_peers); - - // Turn local port mapping into constraints on other connectors - - // - constraints on other components due to transferred ports - for port_delta in &branch.ports_delta { - // For transferred ports we always have two constraints: one for the - // sender and one for the receiver, ensuring it was not used. - // TODO: This will fail if a port is passed around multiple times. - // maybe a special "passed along" entry in `ports_delta`. - if !sync_message.check_constraint(comp_ctx.id, SyncBranchConstraint::SilentPort(port_delta.port.self_id)).unwrap() { - return None; - } - - // Might need to check if we own the other side of the channel - let port = comp_ctx.get_port_by_id(port_delta.port.self_id).unwrap(); - if !sync_message.add_or_check_constraint(port.peer_connector, SyncBranchConstraint::SilentPort(port.peer_id)).unwrap() { - return None; - } - } - - // - constraints on other components due to owned ports - for port_index in 0..self.ports.num_ports() { - let port_id = self.ports.get_port_id(port_index); - let port_mapping = self.ports.get_port(branch_id.index, port_index); - let port = comp_ctx.get_port_by_id(port_id).unwrap(); - - let constraint = if port_mapping.is_assigned { - if port.kind == PortKind::Getter { - SyncBranchConstraint::BranchNumber(port_mapping.last_registered_branch_id) - } else { - SyncBranchConstraint::PortMapping(port.peer_id, port_mapping.last_registered_branch_id) - } - } else { - SyncBranchConstraint::SilentPort(port.peer_id) - }; - - if !sync_message.add_or_check_constraint(port.peer_connector, constraint).unwrap() { - return None; - } - } - - return Some(sync_message); - } - - fn submit_sync_solution(&mut self, partial_solution: SyncMessage, comp_ctx: &mut ComponentCtxFancy) { - if partial_solution.to_visit.is_empty() { - // Solution is completely consistent. So ask everyone to commit - // TODO: Maybe another package for random? - let comparison_number: u64 = unsafe { - let mut random_array = [0u8; 8]; - getrandom::getrandom(&mut random_array).unwrap(); - std::mem::transmute(random_array) - }; - - let num_local = partial_solution.local_solutions.len(); - - let mut full_solution = SolutionMessage{ - comparison_number, - connector_origin: comp_ctx.id, - local_solutions: Vec::with_capacity(num_local), - to_visit: Vec::with_capacity(num_local - 1), - }; - - for local_solution in &partial_solution.local_solutions { - full_solution.local_solutions.push((local_solution.connector_id, local_solution.terminating_branch_id)); - if local_solution.connector_id != comp_ctx.id { - full_solution.to_visit.push(local_solution.connector_id); - } - } - - debug_assert!(self.committed_to.is_none()); - self.committed_to = Some((full_solution.connector_origin, full_solution.comparison_number)); - comp_ctx.submit_message(MessageContents::RequestCommit(full_solution)); - } else { - // Still have connectors to visit - comp_ctx.submit_message(MessageContents::Sync(partial_solution)); - } - } - - fn branch_ids_of_execution_path(&self, leaf_branch_id: BranchId, parents: &mut Vec) { - debug_assert!(parents.is_empty()); - - let mut next_branch_id = leaf_branch_id; - debug_assert!(next_branch_id.is_valid()); - - while next_branch_id.is_valid() { - parents.push(next_branch_id); - let branch = &self.branches[next_branch_id.index as usize]; - next_branch_id = branch.parent_index; - } - } -} - -#[derive(Eq, PartialEq)] -pub(crate) enum ConnectorScheduling { - Immediate, // Run again, immediately - Later, // Schedule for running, at some later point in time - NotNow, // Do not reschedule for running - Exit, // Connector has exited -} - -/// Recursively goes through the value group, attempting to find ports. -/// Duplicates will only be added once. -pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { - // Helper to check a value for a port and recurse if needed. - fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { - match value { - Value::Input(port_id) | Value::Output(port_id) => { - // This is an actual port - let cur_port = PortIdLocal::new(port_id.0.u32_suffix); - for prev_port in ports.iter() { - if *prev_port == cur_port { - // Already added - return; - } - } - - ports.push(cur_port); - }, - Value::Array(heap_pos) | - Value::Message(heap_pos) | - Value::String(heap_pos) | - Value::Struct(heap_pos) | - Value::Union(_, heap_pos) => { - // Reference to some dynamic thing which might contain ports, - // so recurse - let heap_region = &group.regions[*heap_pos as usize]; - for embedded_value in heap_region { - find_port_in_value(group, embedded_value, ports); - } - }, - _ => {}, // values we don't care about - } - } - - // Clear the ports, then scan all the available values - ports.clear(); - for value in &value_group.values { - find_port_in_value(value_group, value, ports); + ctx.notify_sync_end(&[]); } } \ No newline at end of file diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs deleted file mode 100644 index f33e6c5c2bb5bdbcfa4949ebb9fe7ee0c7de128c..0000000000000000000000000000000000000000 --- a/src/runtime2/connector2.rs +++ /dev/null @@ -1,385 +0,0 @@ -use std::collections::HashMap; -/// connector.rs -/// -/// Represents a component. A component (and the scheduler that is running it) -/// has many properties that are not easy to subdivide into aspects that are -/// conceptually handled by particular data structures. That is to say: the code -/// that we run governs: running PDL code, keeping track of ports, instantiating -/// new components and transports (i.e. interacting with the runtime), running -/// a consensus algorithm, etc. But on the other hand, our data is rather -/// simple: we have a speculative execution tree, a set of ports that we own, -/// and a bit of code that we should run. -/// -/// So currently the code is organized as following: -/// - The scheduler that is running the component is the authoritative source on -/// ports during *non-sync* mode. The consensus algorithm is the -/// authoritative source during *sync* mode. They retrieve each other's -/// state during the transitions. Hence port data exists duplicated between -/// these two datastructures. -/// - The execution tree is where executed branches reside. But the execution -/// tree is only aware of the tree shape itself (and keeps track of some -/// queues of branches that are in a particular state), and tends to store -/// the PDL program state. The consensus algorithm is also somewhat aware -/// of the execution tree, but only in terms of what is needed to complete -/// a sync round (for now, that means the port mapping in each branch). -/// Hence once more we have properties conceptually associated with branches -/// in two places. -/// - TODO: Write about handling messages, consensus wrapping data -/// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx - -use std::sync::atomic::AtomicBool; - -use crate::PortId; -use crate::common::ComponentState; -use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::protocol::{RunContext, RunResult}; -use crate::runtime2::consensus::find_ports_in_value_group; -use crate::runtime2::inbox2::DataContent; -use crate::runtime2::port::PortKind; - -use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; -use super::consensus::{Consensus, Consistency}; -use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox}; -use super::native::Connector; -use super::port::PortIdLocal; -use super::scheduler::{ComponentCtxFancy, SchedulerCtx}; - -pub(crate) struct ConnectorPublic { - pub inbox: PublicInbox, - pub sleeping: AtomicBool, -} - -impl ConnectorPublic { - pub fn new(initialize_as_sleeping: bool) -> Self { - ConnectorPublic{ - inbox: PublicInbox::new(), - sleeping: AtomicBool::new(initialize_as_sleeping), - } - } -} - -#[derive(Eq, PartialEq)] -pub(crate) enum ConnectorScheduling { - Immediate, // Run again, immediately - Later, // Schedule for running, at some later point in time - NotNow, // Do not reschedule for running - Exit, // Connector has exited -} - -pub(crate) struct ConnectorPDL { - tree: ExecTree, - consensus: Consensus, -} - -struct ConnectorRunContext<'a> { - branch_id: BranchId, - consensus: &'a Consensus, - received: &'a HashMap, - scheduler: SchedulerCtx<'a>, - prepared_channel: Option<(Value, Value)>, -} - -impl<'a> RunContext for ConnectorRunContext<'a>{ - fn did_put(&mut self, port: PortId) -> bool { - let port_id = PortIdLocal::new(port.0.u32_suffix); - let annotation = self.consensus.get_annotation(self.branch_id, port_id); - return annotation.registered_id.is_some(); - } - - fn get(&mut self, port: PortId) -> Option { - let port_id = PortIdLocal::new(port.0.u32_suffix); - match self.received.get(&port_id) { - Some(data) => Some(data.clone()), - None => None, - } - } - - fn fires(&mut self, port: PortId) -> Option { - let port_id = PortIdLocal::new(port.0.u32_suffix); - let annotation = self.consensus.get_annotation(self.branch_id, port_id); - return annotation.expected_firing.map(|v| Value::Bool(v)); - } - - fn get_channel(&mut self) -> Option<(Value, Value)> { - return self.prepared_channel.take(); - } -} - -impl Connector for ConnectorPDL { - fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - self.handle_new_messages(comp_ctx); - if self.tree.is_in_sync() { - let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; - } else { - return scheduling - } - } else { - let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); - return scheduling; - } - } -} - -impl ConnectorPDL { - pub fn new(initial: ComponentState) -> Self { - Self{ - tree: ExecTree::new(initial), - consensus: Consensus::new(), - } - } - - // --- Handling messages - - pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) { - while let Some(message) = ctx.read_next_message() { - match message { - MessageFancy::Data(message) => self.handle_new_data_message(message, ctx), - MessageFancy::Sync(message) => self.handle_new_sync_message(message, ctx), - MessageFancy::Control(_) => unreachable!("control message in component"), - } - } - } - - pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) { - // Go through all branches that are awaiting new messages and see if - // there is one that can receive this message. - debug_assert!(ctx.workspace_branches.is_empty()); - let mut branches = Vec::new(); // TODO: @Remove - self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches); - - for branch_id in branches.drain(..) { - // This branch can receive, so fork and given it the message - let receiving_branch_id = self.tree.fork_branch(branch_id); - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - let receiving_branch = &mut self.tree[receiving_branch_id]; - - receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); - - // And prepare the branch for running - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - } - } - - pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { - if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, ctx); - } - } - - // --- Running code - - pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - // Check if we have any branch that needs running - debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); - let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); - if branch_id.is_none() { - return ConnectorScheduling::NotNow; - } - - // Retrieve the branch and run it - let branch_id = branch_id.unwrap(); - let branch = &mut self.tree[branch_id]; - - let mut run_context = ConnectorRunContext{ - branch_id, - consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), - }; - let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); - - // Handle the returned result. Note that this match statement contains - // explicit returns in case the run result requires that the component's - // code is ran again immediately - match run_result { - RunResult::BranchInconsistent => { - // Branch became inconsistent - branch.sync_state = SpeculativeState::Inconsistent; - }, - RunResult::BranchMissingPortState(port_id) => { - // Branch called `fires()` on a port that has not been used yet. - let port_id = PortIdLocal::new(port_id.0.u32_suffix); - - // Create two forks, one that assumes the port will fire, and - // one that assumes the port remains silent - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - - let firing_branch_id = self.tree.fork_branch(branch_id); - let silent_branch_id = self.tree.fork_branch(branch_id); - self.consensus.notify_of_new_branch(branch_id, firing_branch_id); - let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); - debug_assert_eq!(_result, Consistency::Valid); - self.consensus.notify_of_new_branch(branch_id, silent_branch_id); - let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); - debug_assert_eq!(_result, Consistency::Valid); - - // Somewhat important: we push the firing one first, such that - // that branch is ran again immediately. - self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); - self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); - - return ConnectorScheduling::Immediate; - }, - RunResult::BranchMissingPortValue(port_id) => { - // Branch performed a `get()` on a port that does not have a - // received message on that port. - let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `get()` is valid, so mark the branch as awaiting a message - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.awaiting_port = port_id; - self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); - - // Note: we only know that a branch is waiting on a message when - // it reaches the `get` call. But we might have already received - // a message that targets this branch, so check now. - let mut any_branch_received = false; - for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) { - // This branch can receive the message, so we do the - // fork-and-receive dance - let receiving_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[receiving_branch_id]; - - branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - - any_branch_received = true; - } - } - - if any_branch_received { - return ConnectorScheduling::Immediate; - } - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } - } - RunResult::BranchAtSyncEnd => { - let consistency = self.consensus.notify_of_finished_branch(branch_id); - if consistency == Consistency::Valid { - branch.sync_state = SpeculativeState::ReachedSyncEnd; - self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); - } else if consistency == Consistency::Inconsistent { - branch.sync_state = SpeculativeState::Inconsistent; - } - }, - RunResult::BranchPut(port_id, content) => { - // Branch is attempting to send data - let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `put()` is valid. - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{ - sync_header, data_header, - content: DataContent::Message(content), - })); - - self.tree.push_into_queue(QueueKind::Runnable, branch_id); - return ConnectorScheduling::Immediate; - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } - }, - _ => unreachable!("unexpected run result {:?} in sync mode", run_result), - } - - // If here then the run result did not require a particular action. We - // return whether we have more active branches to run or not. - if self.tree.queue_is_empty(QueueKind::Runnable) { - return ConnectorScheduling::NotNow; - } else { - return ConnectorScheduling::Later; - } - } - - pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); - - let branch = self.tree.base_branch_mut(); - debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); - - let mut run_context = ConnectorRunContext{ - branch_id: branch.id, - consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), - }; - let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); - - match run_result { - RunResult::ComponentTerminated => { - branch.sync_state = SpeculativeState::Finished; - - return ConnectorScheduling::Exit; - }, - RunResult::ComponentAtSyncStart => { - comp_ctx.notify_sync_start(); - let sync_branch_id = self.tree.start_sync(); - self.consensus.start_sync(comp_ctx); - self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); - self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); - - return ConnectorScheduling::Immediate; - }, - RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { - // Note: we're relinquishing ownership of ports. But because - // we are in non-sync mode the scheduler will handle and check - // port ownership transfer. - debug_assert!(comp_ctx.workspace_ports.is_empty()); - find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); - - let new_state = ComponentState { - prompt: Prompt::new( - &sched_ctx.runtime.protocol_description.types, - &sched_ctx.runtime.protocol_description.heap, - definition_id, monomorph_idx, arguments - ), - }; - let new_component = ConnectorPDL::new(new_state); - comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); - comp_ctx.workspace_ports.clear(); - - return ConnectorScheduling::Later; - }, - RunResult::NewChannel => { - let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); - debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); - branch.prepared_channel = Some(( - Value::Input(PortId::new(putter.self_id.index)), - Value::Output(PortId::new(getter.self_id.index)), - )); - - comp_ctx.push_port(putter); - comp_ctx.push_port(getter); - - return ConnectorScheduling::Immediate; - }, - _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), - } - } - - pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtxFancy) { - let mut fake_vec = Vec::new(); - self.tree.end_sync(solution_branch_id); - self.consensus.end_sync(solution_branch_id, &mut fake_vec); - - for port in fake_vec { - // TODO: Handle sent/received ports - debug_assert!(ctx.get_port_by_id(port).is_some()); - } - - ctx.notify_sync_end(&[]); - } -} \ No newline at end of file diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index d0c833ceb22ef431c45ac02b134bac28b70bd53a..c3a2947ea86c1ed9c9e458826ca585c236ca9d1d 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,15 +1,16 @@ use crate::collections::VecSet; + use crate::protocol::eval::ValueGroup; -use crate::runtime2::inbox2::DataContent; use super::branch::{BranchId, ExecTree, QueueKind}; use super::ConnectorId; -use super::port::{ChannelId, Port, PortIdLocal}; -use super::inbox2::{ - DataHeader, DataMessageFancy, MessageFancy, - SyncContent, SyncHeader, SyncMessageFancy, PortAnnotation +use super::port::{ChannelId, PortIdLocal}; +use super::inbox::{ + Message, PortAnnotation, + DataMessage, DataContent, DataHeader, + SyncMessage, SyncContent, SyncHeader, }; -use super::scheduler::ComponentCtxFancy; +use super::scheduler::ComponentCtx; struct BranchAnnotation { port_mapping: Vec, @@ -94,7 +95,7 @@ impl Consensus { /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. - pub fn start_sync(&mut self, ctx: &ComponentCtxFancy) { + pub fn start_sync(&mut self, ctx: &ComponentCtx) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); debug_assert!(self.last_finished_handled.is_none()); @@ -183,7 +184,7 @@ impl Consensus { /// Generates sync messages for any branches that are at the end of the /// sync block. To find these branches, they should've been put in the /// "finished" queue in the execution tree. - pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) -> Option { + pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtx) -> Option { debug_assert!(self.is_in_sync()); let mut last_branch_id = self.last_finished_handled; @@ -201,7 +202,7 @@ impl Consensus { let channel_id = port_desc.channel_id; if !self.encountered_ports.contains(&port.port_id) { - ctx.submit_message(MessageFancy::Data(DataMessageFancy{ + ctx.submit_message(Message::Data(DataMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, @@ -265,7 +266,7 @@ impl Consensus { /// Prepares a message for sending. Caller should have made sure that /// sending the message is consistent with the speculative state. - pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) { + pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); let branch = &mut self.branch_annotations[branch_id.index as usize]; @@ -317,7 +318,7 @@ impl Consensus { /// `branch_can_receive` function. /// 2. We return the branches that *can* receive the message, you still /// have to explicitly call `notify_of_received_message`. - pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessageFancy, ctx: &mut ComponentCtxFancy, target_ids: &mut Vec) { + pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) { self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids); self.handle_received_sync_header(&message.sync_header, ctx); } @@ -325,7 +326,7 @@ impl Consensus { /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. - pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) -> Option { + pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option { self.handle_received_sync_header(&message.sync_header, ctx); // And handle the contents @@ -419,7 +420,7 @@ impl Consensus { } } - fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { + fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves self.encountered_peers.push(sync_header.sending_component_id); @@ -434,12 +435,12 @@ impl Consensus { continue } - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: *encountered_id, content: SyncContent::Notification, }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); } // But also send our locally combined solution @@ -447,16 +448,16 @@ impl Consensus { } else if sync_header.highest_component_id < self.highest_connector_id { // Sender has lower leader ID, so it should know about our higher // one. - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: sync_header.sending_component_id, content: SyncContent::Notification }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); } // else: exactly equal, so do nothing } - fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) -> Option { + fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option { println!("DEBUG [....:.. conn:{:02}]: Storing local solution for component {}, branch {}", ctx.id.0, solution.component.0, solution.final_branch_id.index); if self.highest_connector_id == ctx.id { @@ -470,12 +471,12 @@ impl Consensus { continue; } - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: connector_id, content: SyncContent::GlobalSolution(global_solution.clone()), }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); } debug_assert!(my_final_branch_id.is_valid()); @@ -485,34 +486,34 @@ impl Consensus { } } else { // Someone else is the leader - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, content: SyncContent::LocalSolution(solution), }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); return None; } } #[inline] - fn create_sync_header(&self, ctx: &ComponentCtxFancy) -> SyncHeader { + fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader { return SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, } } - fn forward_local_solutions(&mut self, ctx: &mut ComponentCtxFancy) { + fn forward_local_solutions(&mut self, ctx: &mut ComponentCtx) { debug_assert_ne!(self.highest_connector_id, ctx.id); for local_solution in self.solution_combiner.drain() { - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, content: SyncContent::LocalSolution(local_solution), }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); } } } diff --git a/src/runtime2/global_store.rs b/src/runtime2/global_store.rs deleted file mode 100644 index ded4be362c43d4da660cd1117837216957f33af9..0000000000000000000000000000000000000000 --- a/src/runtime2/global_store.rs +++ /dev/null @@ -1,179 +0,0 @@ -use std::ptr; -use std::sync::{Arc, RwLock}; -use std::sync::atomic::{AtomicBool, AtomicU32}; - -use crate::collections::{MpmcQueue, RawVec}; -use crate::ProtocolDescription; - -use super::scheduler::{Router, ConnectorCtx}; -use super::connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling, RunDeltaState}; -use super::inbox::Message; -use super::native::{Connector, ConnectorApplication}; - - -/// The registry containing all connectors. The idea here is that when someone -/// owns a `ConnectorKey`, then one has unique access to that connector. -/// Otherwise one has shared access. -/// -/// This datastructure is built to be wrapped in a RwLock. -pub(crate) struct ConnectorStore { - pub(crate) port_counter: Arc, - inner: RwLock, -} - -struct ConnectorStoreInner { - connectors: RawVec<*mut ScheduledConnector>, - free: Vec, -} - -impl ConnectorStore { - fn with_capacity(capacity: usize) -> Self { - return Self{ - port_counter: Arc::new(AtomicU32::new(0)), - inner: RwLock::new(ConnectorStoreInner { - connectors: RawVec::with_capacity(capacity), - free: Vec::with_capacity(capacity), - }), - }; - } - - /// Retrieves the shared members of the connector. - pub(crate) fn get_shared(&self, connector_id: ConnectorId) -> &'static ConnectorPublic { - let lock = self.inner.read().unwrap(); - - unsafe { - let connector = lock.connectors.get(connector_id.0 as usize); - debug_assert!(!connector.is_null()); - return &(**connector).public; - } - } - - /// Retrieves a particular connector. Only the thread that pulled the - /// associated key out of the execution queue should (be able to) call this. - pub(crate) fn get_mut(&self, key: &ConnectorKey) -> &'static mut ScheduledConnector { - let lock = self.inner.read().unwrap(); - - unsafe { - let connector = lock.connectors.get_mut(key.index as usize); - debug_assert!(!connector.is_null()); - return &mut (**connector); - } - } - - pub(crate) fn create_interface(&self, connector: ConnectorApplication) -> ConnectorKey { - // Connector interface does not own any initial ports, and cannot be - // created by another connector - let key = self.create_connector_raw(ConnectorVariant::Native(Box::new(connector)), true); - return key; - } - - /// Create a new connector, returning the key that can be used to retrieve - /// and/or queue it. The caller must make sure that the constructed - /// connector's code is initialized with the same ports as the ports in the - /// `initial_ports` array. Furthermore the connector is initialized as not - /// sleeping, so MUST be put on the connector queue by the caller. - pub(crate) fn create_pdl(&self, created_by: &mut ScheduledConnector, connector: ConnectorPDL) -> ConnectorKey { - let key = self.create_connector_raw(ConnectorVariant::UserDefined(connector), false); - let new_connector = self.get_mut(&key); - - // Transferring ownership of ports (and crashing if there is a - // programmer's mistake in port management) - match &new_connector.connector { - ConnectorVariant::UserDefined(connector) => { - for port_id in &connector.ports.owned_ports { - let mut port = created_by.context.remove_port(*port_id); - new_connector.context.add_port(port); - } - }, - ConnectorVariant::Native(_) => unreachable!(), - } - - return key; - } - - pub(crate) fn destroy(&self, key: ConnectorKey) { - let mut lock = self.inner.write().unwrap(); - - unsafe { - let connector = lock.connectors.get_mut(key.index as usize); - ptr::drop_in_place(*connector); - // Note: but not deallocating! - } - - lock.free.push(key.index as usize); - } - - /// Creates a connector but does not set its initial ports - fn create_connector_raw(&self, connector: ConnectorVariant, initialize_as_sleeping: bool) -> ConnectorKey { - // Creation of the connector in the global store, requires a lock - let index; - { - let mut lock = self.inner.write().unwrap(); - let connector = ScheduledConnector { - connector, - context: ConnectorCtx::new(self.port_counter.clone()), - public: ConnectorPublic::new(initialize_as_sleeping), - router: Router::new(), - }; - - if lock.free.is_empty() { - let connector = Box::into_raw(Box::new(connector)); - - index = lock.connectors.len(); - lock.connectors.push(connector); - } else { - index = lock.free.pop().unwrap(); - - unsafe { - let target = lock.connectors.get_mut(index); - debug_assert!(!target.is_null()); - ptr::write(*target, connector); - } - } - } - - // Generate key and retrieve the connector to set its ID - let key = ConnectorKey{ index: index as u32 }; - let new_connector = self.get_mut(&key); - new_connector.context.id = key.downcast(); - - // Return the connector key - return key; - } -} - -impl Drop for ConnectorStore { - fn drop(&mut self) { - let lock = self.inner.write().unwrap(); - - for idx in 0..lock.connectors.len() { - unsafe { - let memory = *lock.connectors.get_mut(idx); - let _ = Box::from_raw(memory); // takes care of deallocation - } - } - } -} - -/// Global store of connectors, ports and queues that are used by the sceduler -/// threads. The global store has the appearance of a thread-safe datatype, but -/// one needs to be careful using it. -/// -/// TODO: @docs -/// TODO: @Optimize, very lazy implementation of concurrent datastructures. -/// This includes the `should_exit` and `did_exit` pair! -pub(crate) struct GlobalStore { - pub connector_queue: MpmcQueue, - pub connectors: ConnectorStore, - pub should_exit: AtomicBool, // signal threads to exit -} - -impl GlobalStore { - pub(crate) fn new() -> Self { - Self{ - connector_queue: MpmcQueue::with_capacity(256), - connectors: ConnectorStore::with_capacity(256), - should_exit: AtomicBool::new(false), - } - } -} \ No newline at end of file diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index a26cf17373de3aab4a0c652c59288d1e4934cab1..808fe06a90cb43fb4205329a7104e634a10e41b5 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -1,211 +1,137 @@ -/** -inbox.rs - -Contains various types of inboxes and message types for the connectors. There -are two kinds of inboxes: - -The `PublicInbox` is a simple message queue. Messages are put in by various -threads, and they're taken out by a single thread. These messages may contain -control messages and may be filtered or redirected by the scheduler. - -The `PrivateInbox` is a temporary storage for all messages that are received -within a certain sync-round. -**/ - -use std::collections::VecDeque; use std::sync::Mutex; +use std::collections::VecDeque; -use super::ConnectorId; use crate::protocol::eval::ValueGroup; -use crate::runtime2::inbox2::MessageFancy; -use super::connector::BranchId; + +use super::ConnectorId; +use super::branch::BranchId; +use super::consensus::{GlobalSolution, LocalSolution}; use super::port::PortIdLocal; -/// A message that has been delivered (after being imbued with the receiving -/// port by the scheduler) to a connector. -// TODO: Remove Debug on messages -#[derive(Debug, Clone)] -pub struct DataMessage { - pub sending_port: PortIdLocal, - pub sender_prev_branch_id: BranchId, - pub sender_cur_branch_id: BranchId, - pub message: ValueGroup, -} +// TODO: Remove Debug derive from all types -#[derive(Debug, Clone)] -pub enum SyncBranchConstraint { - SilentPort(PortIdLocal), - BranchNumber(BranchId), - PortMapping(PortIdLocal, BranchId), +#[derive(Debug, Copy, Clone)] +pub(crate) struct PortAnnotation { + pub port_id: PortIdLocal, + pub registered_id: Option, + pub expected_firing: Option, } +/// The header added by the synchronization algorithm to all. #[derive(Debug, Clone)] -pub struct SyncConnectorSolution { - pub connector_id: ConnectorId, - pub terminating_branch_id: BranchId, - pub execution_branch_ids: Vec, // no particular ordering of IDs enforced - pub final_port_mapping: Vec<(PortIdLocal, BranchId)> +pub(crate) struct SyncHeader { + pub sending_component_id: ConnectorId, + pub highest_component_id: ConnectorId, } +/// The header added to data messages #[derive(Debug, Clone)] -pub struct SyncConnectorConstraints { - pub connector_id: ConnectorId, - pub constraints: Vec, +pub(crate) struct DataHeader { + pub expected_mapping: Vec, + pub sending_port: PortIdLocal, + pub target_port: PortIdLocal, + pub new_mapping: BranchId, } +// TODO: Very much on the fence about this. On one hand I thought making it a +// data message was neat because "silent port notification" should be rerouted +// like any other data message to determine the component ID of the receiver +// and to make it part of the leader election algorithm for the sync leader. +// However: it complicates logic quite a bit. Really it might be easier to +// create `Message::SyncAtComponent` and `Message::SyncAtPort` messages... #[derive(Debug, Clone)] -pub struct SyncMessage { - pub local_solutions: Vec, - pub constraints: Vec, - pub to_visit: Vec, +pub(crate) enum DataContent { + SilentPortNotification, + Message(ValueGroup), } -// TODO: Shouldn't really be here, right? -impl SyncMessage { - /// Creates a new sync message. Assumes that it is created by a connector - /// that has just encountered a new local solution. - pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self { - let mut local_solutions = Vec::with_capacity(approximate_peers); - local_solutions.push(initial_solution); - - return Self{ - local_solutions, - constraints: Vec::with_capacity(approximate_peers), - to_visit: Vec::with_capacity(approximate_peers), - }; - } - - /// Checks if a connector has already provided a local solution - pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool { - return self.local_solutions - .iter() - .any(|v| v.connector_id == connector_id); - } - - /// Adds a new constraint. If the connector has already provided a local - /// solution then the constraint will be checked. Otherwise the constraint - /// will be added to the solution. If this is the first constraint for a - /// connector then it will be added to the connectors that still have to be - /// visited. - /// - /// If this returns true then the constraint was added, or the local - /// solution for the specified connector satisfies the constraint. If this - /// function returns an error then we're dealing with a nefarious peer. - pub(crate) fn add_or_check_constraint( - &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint - ) -> Result { - if self.has_local_solution_for(connector_id) { - return self.check_constraint(connector_id, constraint); - } else { - self.add_constraint(connector_id, constraint); - return Ok(true); +impl DataContent { + pub(crate) fn as_message(&self) -> Option<&ValueGroup> { + match self { + DataContent::SilentPortNotification => None, + DataContent::Message(message) => Some(message), } } +} - /// Pushes a new connector constraint. Caller must ensure that the solution - /// has not yet arrived at the specified connector (because then it would no - /// longer have constraints, but a proposed solution instead). - pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) { - debug_assert!(!self.has_local_solution_for(connector_id)); - - let position = self.constraints - .iter() - .position(|v| v.connector_id == connector_id); - - match position { - Some(index) => { - // Has pre-existing constraints - debug_assert!(self.to_visit.contains(&connector_id)); - let entry = &mut self.constraints[index]; - entry.constraints.push(constraint); - }, - None => { - debug_assert!(!self.to_visit.contains(&connector_id)); - self.constraints.push(SyncConnectorConstraints{ - connector_id, - constraints: vec![constraint], - }); - self.to_visit.push(connector_id); - } - } - } +/// A data message is a message that is intended for the receiver's PDL code, +/// but will also be handled by the consensus algorithm +#[derive(Debug, Clone)] +pub(crate) struct DataMessage { + pub sync_header: SyncHeader, + pub data_header: DataHeader, + pub content: DataContent, +} - /// Checks if a constraint is satisfied by a solution. Caller must make sure - /// that a local solution has already been provided. Will return an error - /// value only if the provided constraint does not make sense (i.e. a - /// nefarious peer has supplied a constraint with a port we do not own). - pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result { - debug_assert!(self.has_local_solution_for(connector_id)); - - let entry = self.local_solutions - .iter() - .find(|v| v.connector_id == connector_id) - .unwrap(); - - match constraint { - SyncBranchConstraint::SilentPort(silent_port_id) => { - for (port_id, mapped_id) in &entry.final_port_mapping { - if *port_id == silent_port_id { - // If silent, then mapped ID is invalid - return Ok(!mapped_id.is_valid()) - } - } - - return Err(()); - }, - SyncBranchConstraint::BranchNumber(expected_branch_id) => { - return Ok(entry.execution_branch_ids.contains(&expected_branch_id)); - }, - SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => { - for (port_id, mapped_id) in &entry.final_port_mapping { - if port_id == port_id { - return Ok(*mapped_id == expected_branch_id); - } - } - - return Err(()); - }, - } - } +#[derive(Debug)] +pub(crate) enum SyncContent { + LocalSolution(LocalSolution), // sending a local solution to the leader + GlobalSolution(GlobalSolution), // broadcasting to everyone + Notification, // just a notification (so purpose of message is to send the SyncHeader) } -#[derive(Debug, Clone)] -pub struct SolutionMessage { - pub comparison_number: u64, - pub connector_origin: ConnectorId, - pub local_solutions: Vec<(ConnectorId, BranchId)>, - pub to_visit: Vec, +/// A sync message is a message that is intended only for the consensus +/// algorithm. +#[derive(Debug)] +pub(crate) struct SyncMessage { + pub sync_header: SyncHeader, + pub target_component_id: ConnectorId, + pub content: SyncContent, } -/// A control message. These might be sent by the scheduler to notify eachother -/// of asynchronous state changes. -#[derive(Debug, Clone)] -pub struct ControlMessage { +/// A control message is a message intended for the scheduler that is executing +/// a component. +#[derive(Debug)] +pub(crate) struct ControlMessage { pub id: u32, // generic identifier, used to match request to response - pub content: ControlMessageVariant, + pub sending_component_id: ConnectorId, + pub content: ControlContent, } -#[derive(Debug, Clone)] -pub enum ControlMessageVariant { - ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port - CloseChannel(PortIdLocal), // close the port associated with this - Ack, // acknowledgement of previous control message, matching occurs through control message ID. +#[derive(Debug)] +pub(crate) enum ControlContent { + PortPeerChanged(PortIdLocal, ConnectorId), + CloseChannel(PortIdLocal), + Ack, + Ping, } -/// Generic message contents. -#[derive(Debug, Clone)] -pub enum MessageContents { - Data(DataMessage), // data message, handled by connector - Sync(SyncMessage), // sync message, handled by both connector/scheduler - RequestCommit(SolutionMessage), // solution message, requesting participants to commit - ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to - Control(ControlMessage), // control message, handled by scheduler - Ping, // ping message, intentionally waking up a connector (used for native connectors) +/// Combination of data message and control messages. +#[derive(Debug)] +pub(crate) enum Message { + Data(DataMessage), + Sync(SyncMessage), + Control(ControlMessage), } -#[derive(Debug)] -pub struct Message { - pub sending_connector: ConnectorId, - pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector) - pub contents: MessageContents, +/// The public inbox of a connector. The thread running the connector that owns +/// this inbox may retrieved from it. Non-owning threads may only put new +/// messages inside of it. +// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. +// Should behave as a MPSC queue. +pub struct PublicInbox { + messages: Mutex>, +} + +impl PublicInbox { + pub fn new() -> Self { + Self{ + messages: Mutex::new(VecDeque::new()), + } + } + + pub(crate) fn insert_message(&self, message: Message) { + let mut lock = self.messages.lock().unwrap(); + lock.push_back(message); + } + + pub(crate) fn take_message(&self) -> Option { + let mut lock = self.messages.lock().unwrap(); + return lock.pop_front(); + } + + pub fn is_empty(&self) -> bool { + let lock = self.messages.lock().unwrap(); + return lock.is_empty(); + } } \ No newline at end of file diff --git a/src/runtime2/inbox2.rs b/src/runtime2/inbox2.rs deleted file mode 100644 index 238188d73d34d89f3c378e7262557fdaf37dfab5..0000000000000000000000000000000000000000 --- a/src/runtime2/inbox2.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::sync::Mutex; -use std::collections::VecDeque; - -use crate::protocol::eval::ValueGroup; -use crate::runtime2::branch::BranchId; -use crate::runtime2::ConnectorId; -use crate::runtime2::consensus::{GlobalSolution, LocalSolution}; -use crate::runtime2::port::PortIdLocal; - -// TODO: Remove Debug derive from all types - -#[derive(Debug, Copy, Clone)] -pub(crate) struct PortAnnotation { - pub port_id: PortIdLocal, - pub registered_id: Option, - pub expected_firing: Option, -} - -/// The header added by the synchronization algorithm to all. -#[derive(Debug, Clone)] -pub(crate) struct SyncHeader { - pub sending_component_id: ConnectorId, - pub highest_component_id: ConnectorId, -} - -/// The header added to data messages -#[derive(Debug, Clone)] -pub(crate) struct DataHeader { - pub expected_mapping: Vec, - pub sending_port: PortIdLocal, - pub target_port: PortIdLocal, - pub new_mapping: BranchId, -} - -// TODO: Very much on the fence about this. On one hand I thought making it a -// data message was neat because "silent port notification" should be rerouted -// like any other data message to determine the component ID of the receiver -// and to make it part of the leader election algorithm for the sync leader. -// However: it complicates logic quite a bit. Really it might be easier to -// create `Message::SyncAtComponent` and `Message::SyncAtPort` messages... -#[derive(Debug, Clone)] -pub(crate) enum DataContent { - SilentPortNotification, - Message(ValueGroup), -} - -impl DataContent { - pub(crate) fn as_message(&self) -> Option<&ValueGroup> { - match self { - DataContent::SilentPortNotification => None, - DataContent::Message(message) => Some(message), - } - } -} - -/// A data message is a message that is intended for the receiver's PDL code, -/// but will also be handled by the consensus algorithm -#[derive(Debug, Clone)] -pub(crate) struct DataMessageFancy { - pub sync_header: SyncHeader, - pub data_header: DataHeader, - pub content: DataContent, -} - -#[derive(Debug)] -pub(crate) enum SyncContent { - LocalSolution(LocalSolution), // sending a local solution to the leader - GlobalSolution(GlobalSolution), // broadcasting to everyone - Notification, // just a notification (so purpose of message is to send the SyncHeader) -} - -/// A sync message is a message that is intended only for the consensus -/// algorithm. -#[derive(Debug)] -pub(crate) struct SyncMessageFancy { - pub sync_header: SyncHeader, - pub target_component_id: ConnectorId, - pub content: SyncContent, -} - -/// A control message is a message intended for the scheduler that is executing -/// a component. -#[derive(Debug)] -pub(crate) struct ControlMessageFancy { - pub id: u32, // generic identifier, used to match request to response - pub sending_component_id: ConnectorId, - pub content: ControlContent, -} - -#[derive(Debug)] -pub(crate) enum ControlContent { - PortPeerChanged(PortIdLocal, ConnectorId), - CloseChannel(PortIdLocal), - Ack, - Ping, -} - -/// Combination of data message and control messages. -#[derive(Debug)] -pub(crate) enum MessageFancy { - Data(DataMessageFancy), - Sync(SyncMessageFancy), - Control(ControlMessageFancy), -} - -/// The public inbox of a connector. The thread running the connector that owns -/// this inbox may retrieved from it. Non-owning threads may only put new -/// messages inside of it. -// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads. -// Should behave as a MPSC queue. -pub struct PublicInbox { - messages: Mutex>, -} - -impl PublicInbox { - pub fn new() -> Self { - Self{ - messages: Mutex::new(VecDeque::new()), - } - } - - pub(crate) fn insert_message(&self, message: MessageFancy) { - let mut lock = self.messages.lock().unwrap(); - lock.push_back(message); - } - - pub(crate) fn take_message(&self) -> Option { - let mut lock = self.messages.lock().unwrap(); - return lock.pop_front(); - } - - pub fn is_empty(&self) -> bool { - let lock = self.messages.lock().unwrap(); - return lock.is_empty(); - } -} \ No newline at end of file diff --git a/src/runtime2/messages.rs b/src/runtime2/messages.rs deleted file mode 100644 index e0273ef5ad58790624546ec061fce56770775e6a..0000000000000000000000000000000000000000 --- a/src/runtime2/messages.rs +++ /dev/null @@ -1,132 +0,0 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; - -use crate::PortId; -use crate::protocol::eval::*; - -/// A message residing in a connector's inbox (waiting to be put into some kind -/// of speculative branch), or a message waiting to be sent. -#[derive(Clone)] -pub struct BufferedMessage { - pub(crate) sending_port: PortId, - pub(crate) receiving_port: PortId, - pub(crate) peer_prev_branch_id: Option, - pub(crate) peer_cur_branch_id: u32, - pub(crate) message: ValueGroup, -} - -/// A connector's global inbox. Any received message ends up here. This is -/// because a message might be received before a branch arrives at the -/// corresponding `get()` that is supposed to receive that message. Hence we -/// need to store it for all future branches that might be able to receive it. -pub struct ConnectorInbox { - // TODO: @optimize, HashMap + Vec is a bit stupid. - messages: HashMap> -} - - -/// An action performed on a port. Unsure about this -#[derive(PartialEq, Eq, Hash)] -struct PortAction { - port_id: u32, - prev_branch_id: Option, -} - -// TODO: @remove -impl ConnectorInbox { - pub fn new() -> Self { - Self { - messages: HashMap::new(), - } - } - - /// Inserts a new message into the inbox. - pub fn insert_message(&mut self, message: BufferedMessage) { - // TODO: @error - Messages are received from actors we generally cannot - // trust, and may be unreliable, so messages may be received multiple - // times or have spoofed branch IDs. Debug asserts are present for the - // initial implementation. - - // If it is the first message on the port, then we cannot possible have - // a previous port mapping on that port. - let port_action = PortAction{ - port_id: message.receiving_port.0.u32_suffix, - prev_branch_id: message.peer_prev_branch_id, - }; - - match self.messages.entry(port_action) { - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - debug_assert!( - entry.iter() - .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id) - .is_none(), - "inbox already contains sent message (same new branch ID)" - ); - - entry.push(message); - }, - Entry::Vacant(entry) => { - entry.insert(vec![message]); - } - } - } - - /// Checks if the provided port (and the branch id mapped to that port) - /// correspond to any messages in the inbox. - pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option) -> Option<&[BufferedMessage]> { - let port_action = PortAction{ - port_id, - prev_branch_id: prev_branch_id_at_port, - }; - - match self.messages.get(&port_action) { - Some(messages) => return Some(messages.as_slice()), - None => return None, - } - } - - pub fn clear(&mut self) { - self.messages.clear(); - } -} - -/// A connector's outbox. A temporary storage for messages that are sent by -/// branches performing `put`s until we're done running all branches and can -/// actually transmit the messages. -pub struct ConnectorOutbox { - messages: Vec, -} - -impl ConnectorOutbox { - pub fn new() -> Self { - Self{ - messages: Vec::new(), - } - } - - pub fn insert_message(&mut self, message: BufferedMessage) { - // TODO: @error - Depending on the way we implement the runtime in the - // future we might end up not trusting "our own code" (i.e. in case - // the connectors we are running are described by foreign code) - debug_assert!( - self.messages.iter() - .find(|v| - v.sending_port == message.sending_port && - v.peer_prev_branch_id == message.peer_prev_branch_id - ) - .is_none(), - "messages was already registered for sending" - ); - - self.messages.push(message); - } - - pub fn take_next_message_to_send(&mut self) -> Option { - self.messages.pop() - } - - pub fn clear(&mut self) { - self.messages.clear(); - } -} \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index af4373aca8c049f02a7d7bb0d07ba639819c9714..407fda0c76334eef9db7b1877ea42873d4b00045 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -1,18 +1,14 @@ // Structure of module -mod runtime; -mod messages; -// mod connector; mod branch; mod native; mod port; mod scheduler; -// mod inbox; mod consensus; -mod inbox2; +mod inbox; #[cfg(test)] mod tests; -mod connector2; +mod connector; // Imports @@ -24,10 +20,10 @@ use std::thread::{self, JoinHandle}; use crate::collections::RawVec; use crate::ProtocolDescription; -use connector2::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; -use scheduler::{Scheduler, ComponentCtxFancy, SchedulerCtx, ControlMessageHandler}; +use connector::{ConnectorPDL, ConnectorPublic, ConnectorScheduling}; +use scheduler::{Scheduler, ComponentCtx, SchedulerCtx, ControlMessageHandler}; use native::{Connector, ConnectorApplication, ApplicationInterface}; -use inbox2::MessageFancy; +use inbox::Message; use port::{ChannelId, Port, PortState}; /// A kind of token that, once obtained, allows mutable access to a connector. @@ -81,7 +77,7 @@ pub(crate) enum ConnectorVariant { } impl Connector for ConnectorVariant { - fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + fn run(&mut self, scheduler_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { match self { ConnectorVariant::UserDefined(c) => c.run(scheduler_ctx, comp_ctx), ConnectorVariant::Native(c) => c.run(scheduler_ctx, comp_ctx), @@ -91,7 +87,7 @@ impl Connector for ConnectorVariant { pub(crate) struct ScheduledConnector { pub connector: ConnectorVariant, // access by connector - pub ctx_fancy: ComponentCtxFancy, + pub ctx_fancy: ComponentCtx, pub public: ConnectorPublic, // accessible by all schedulers and connectors pub router: ControlMessageHandler, pub shutting_down: bool, @@ -246,7 +242,7 @@ impl RuntimeInner { /// Sends a message to a particular connector. If the connector happened to /// be sleeping then it will be scheduled for execution. - pub(crate) fn send_message(&self, target_id: ConnectorId, message: MessageFancy) { + pub(crate) fn send_message(&self, target_id: ConnectorId, message: Message) { let target = self.get_component_public(target_id); target.inbox.insert_message(message); @@ -412,7 +408,7 @@ impl ConnectorStore { fn create(&mut self, connector: ConnectorVariant, initially_sleeping: bool) -> ConnectorKey { let mut connector = ScheduledConnector { connector, - ctx_fancy: ComponentCtxFancy::new_empty(), + ctx_fancy: ComponentCtx::new_empty(), public: ConnectorPublic::new(initially_sleeping), router: ControlMessageHandler::new(), shutting_down: false, diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 0b6b5fdaf98b09d9f591a2b2b7cbe80e438e1b25..daeeff3c14f8cef0346939cb005260948010bd11 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -6,11 +6,11 @@ use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use super::{ConnectorKey, ConnectorId, RuntimeInner}; -use super::scheduler::{SchedulerCtx, ComponentCtxFancy}; +use super::scheduler::{SchedulerCtx, ComponentCtx}; use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::consensus::find_ports_in_value_group; -use super::connector2::{ConnectorScheduling, ConnectorPDL}; -use super::inbox2::{MessageFancy, ControlContent, ControlMessageFancy}; +use super::connector::{ConnectorScheduling, ConnectorPDL}; +use super::inbox::{Message, ControlContent, ControlMessage}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -18,7 +18,7 @@ pub(crate) trait Connector { /// One should generally request and handle new messages from the component /// context. Then perform any logic the component has to do, and in the /// process perhaps queue up some state changes using the same context. - fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling; + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling; } type SyncDone = Arc<(Mutex, Condvar)>; @@ -53,13 +53,13 @@ impl ConnectorApplication { } impl Connector for ConnectorApplication { - fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { // Handle any incoming messages if we're participating in a round while let Some(message) = comp_ctx.read_next_message() { match message { - MessageFancy::Data(_) => todo!("data message in API connector"), - MessageFancy::Sync(_) => todo!("sync message in API connector"), - MessageFancy::Control(_) => todo!("impossible control message"), + Message::Data(_) => todo!("data message in API connector"), + Message::Sync(_) => todo!("sync message in API connector"), + Message::Control(_) => todo!("impossible control message"), } } @@ -185,7 +185,7 @@ impl ApplicationInterface { fn wake_up_connector_with_ping(&self) { let connector = self.runtime.get_component_public(self.connector_id); - connector.inbox.insert_message(MessageFancy::Control(ControlMessageFancy{ + connector.inbox.insert_message(Message::Control(ControlMessage { id: 0, sending_component_id: self.connector_id, content: ControlContent::Ping, diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs deleted file mode 100644 index ee19e6b5a17562f26a9d074195fb4bb18462b9d5..0000000000000000000000000000000000000000 --- a/src/runtime2/runtime.rs +++ /dev/null @@ -1,1146 +0,0 @@ -use std::sync::Arc; -use std::collections::{HashMap, VecDeque}; -use std::collections::hash_map::{Entry}; - -use crate::{Polarity, PortId}; -use crate::common::Id; -use crate::protocol::*; -use crate::protocol::eval::*; - -use super::messages::*; - -#[derive(Debug)] -pub enum AddComponentError { - ModuleDoesNotExist, - ConnectorDoesNotExist, - InvalidArgumentType(usize), // value is index of (first) invalid argument -} - -pub(crate) struct PortDesc { - id: u32, - peer_id: u32, - owning_connector_id: Option, - is_getter: bool, // otherwise one can only call `put` -} - -pub(crate) struct ConnectorDesc { - id: u32, - in_sync: bool, - pub(crate) branches: Vec, // first one is always non-speculative one - spec_branches_active: VecDeque, // branches that can be run immediately - spec_branches_pending_receive: HashMap>, // from port_id to branch index - spec_branches_done: Vec, - last_checked_done: u32, - global_inbox: ConnectorInbox, - global_outbox: ConnectorOutbox, -} - -impl ConnectorDesc { - /// Creates a new connector description. Implicit assumption is that there - /// is one (non-sync) branch that can be immediately executed. - fn new(id: u32, component_state: ComponentState, owned_ports: Vec) -> Self { - Self{ - id, - in_sync: false, - branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)], - spec_branches_active: VecDeque::new(), - spec_branches_pending_receive: HashMap::new(), - spec_branches_done: Vec::new(), - last_checked_done: 0, - global_inbox: ConnectorInbox::new(), - global_outbox: ConnectorOutbox::new(), - } - } -} - -#[derive(Debug, PartialEq, Eq)] -pub(crate) enum BranchState { - RunningNonSync, // regular running non-speculative branch - RunningSync, // regular running speculative branch - BranchPoint, // branch which ended up being a branching point - ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution - Failed, // branch that became inconsistent - Finished, // branch (necessarily non-sync) that reached end of code -} - -#[derive(Debug, Clone)] -struct BranchPortDesc { - last_registered_index: Option, // if putter, then last sent branch ID, if getter, then last received branch ID - num_times_fired: u32, // number of puts/gets on this port -} - -struct BranchContext { - just_called_did_put: bool, - pending_channel: Option<(Value, Value)>, -} - -pub(crate) struct BranchDesc { - index: u32, - parent_index: Option, - pub(crate) code_state: ComponentState, - pub(crate) branch_state: BranchState, - owned_ports: Vec, - message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value - port_mapping: HashMap, - branch_context: BranchContext, -} - -impl BranchDesc { - /// Creates the first non-sync branch of a connector - fn new_non_sync(component_state: ComponentState, owned_ports: Vec) -> Self { - Self{ - index: 0, - parent_index: None, - code_state: component_state, - branch_state: BranchState::RunningNonSync, - owned_ports, - message_inbox: HashMap::new(), - port_mapping: HashMap::new(), - branch_context: BranchContext{ - just_called_did_put: false, - pending_channel: None, - } - } - } - - /// Creates a sync branch based on the supplied branch. This supplied branch - /// is the branching point for the new one, i.e. the parent in the branching - /// tree. - fn new_sync_from(index: u32, branch_state: &BranchDesc) -> Self { - // We expect that the given branche's context is not halfway handling a - // `put(...)` or a `channel x -> y` statement. - debug_assert!(!branch_state.branch_context.just_called_did_put); - debug_assert!(branch_state.branch_context.pending_channel.is_none()); - - Self{ - index, - parent_index: Some(branch_state.index), - code_state: branch_state.code_state.clone(), - branch_state: BranchState::RunningSync, - owned_ports: branch_state.owned_ports.clone(), - message_inbox: branch_state.message_inbox.clone(), - port_mapping: branch_state.port_mapping.clone(), - branch_context: BranchContext{ - just_called_did_put: false, - pending_channel: None, - } - } - } -} - -#[derive(PartialEq, Eq)] -enum Scheduling { - Immediate, - Later, - NotNow, -} - -#[derive(Clone, Copy, Eq, PartialEq, Debug)] -enum ProposedBranchConstraint { - SilentPort(u32), // port id - BranchNumber(u32), // branch id - PortMapping(u32, u32), // particular port's mapped branch number -} - -// Local solution of the connector -#[derive(Clone)] -struct ProposedConnectorSolution { - final_branch_id: u32, - all_branch_ids: Vec, // the final branch ID and, recursively, all parents - port_mapping: HashMap>, // port IDs of the connector, mapped to their branch IDs (None for silent ports) -} - -#[derive(Clone)] -struct ProposedSolution { - connector_mapping: HashMap, // from connector ID to branch ID - connector_constraints: HashMap>, // from connector ID to encountered branch numbers - remaining_connectors: Vec, // connectors that still need to be visited -} - -// TODO: @performance, use freelists+ids instead of HashMaps -pub struct Runtime { - protocol: Arc, - pub(crate) ports: HashMap, - port_counter: u32, - pub(crate) connectors: HashMap, - connector_counter: u32, - connectors_active: VecDeque, -} - -impl Runtime { - pub fn new(pd: Arc) -> Self { - Self{ - protocol: pd, - ports: HashMap::new(), - port_counter: 0, - connectors: HashMap::new(), - connector_counter: 0, - connectors_active: VecDeque::new(), - } - } - - /// Creates a new channel that is not owned by any connector and returns its - /// endpoints. The returned values are of the (putter port, getter port) - /// respectively. - pub fn add_channel(&mut self) -> (Value, Value) { - let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, None); - return ( - port_value_from_id(None, put_id, true), - port_value_from_id(None, get_id, false) - ); - } - - pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> { - use AddComponentError as ACE; - use crate::runtime::error::AddComponentError as OldACE; - - // TODO: Remove the responsibility of adding a component from the PD - - // Lookup module and the component - // TODO: Remove this error enum translation. Note that for now this - // function forces port-only arguments - let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) { - Ok(polarities) => polarities, - Err(reason) => match reason { - OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)), - OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist), - OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist), - _ => unreachable!(), - } - }; - - // Make sure supplied values (and types) are correct. At the same time - // modify the port IDs such that they contain the ID of the connector - // we're about the create. - let component_id = self.generate_connector_id(); - let mut ports = Vec::with_capacity(values.values.len()); - - for (value_idx, value) in values.values.iter().enumerate() { - let polarity = &port_polarities[value_idx]; - - match value { - Value::Input(port_id) => { - if *polarity != Polarity::Getter { - return Err(ACE::InvalidArgumentType(value_idx)) - } - - ports.push(PortId(Id{ - connector_id: component_id, - u32_suffix: port_id.0.u32_suffix, - })); - }, - Value::Output(port_id) => { - if *polarity != Polarity::Putter { - return Err(ACE::InvalidArgumentType(value_idx)) - } - - ports.push(PortId(Id{ - connector_id: component_id, - u32_suffix: port_id.0.u32_suffix - })); - }, - _ => return Err(ACE::InvalidArgumentType(value_idx)) - } - } - - // Instantiate the component, and mark the ports as being owned by the - // newly instantiated component - let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports); - let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); - - for port in &ports { - let desc = self.ports.get_mut(port).unwrap(); - desc.owning_connector_id = Some(component_id); - } - - self.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); - self.connectors_active.push_back(component_id); - - Ok(()) - } - - pub fn run(&mut self) { - // Go through all active connectors - while !self.connectors_active.is_empty() { - // Run a single connector until it indicates we can run another - // connector - let next_id = self.connectors_active.pop_front().unwrap(); - let mut scheduling = Scheduling::Immediate; - - while scheduling == Scheduling::Immediate { - scheduling = self.run_connector(next_id); - } - - match scheduling { - Scheduling::Immediate => unreachable!(), - Scheduling::Later => self.connectors_active.push_back(next_id), - Scheduling::NotNow => {}, - } - - // Deal with any outgoing messages and potential solutions - self.empty_connector_outbox(next_id); - self.check_connector_new_solutions(next_id); - } - } - - /// Runs a connector for as long as sensible, then returns `true` if the - /// connector should be run again in the future, and return `false` if the - /// connector has terminated. Note that a terminated connector still - /// requires cleanup. - fn run_connector(&mut self, connector_id: u32) -> Scheduling { - let desc = self.connectors.get_mut(&connector_id).unwrap(); - - if desc.in_sync { - return self.run_connector_sync_mode(connector_id); - } else { - return self.run_connector_regular_mode(connector_id); - } - } - - #[inline] - fn run_connector_sync_mode(&mut self, connector_id: u32) -> Scheduling { - // Retrieve connector and branch that is supposed to be run - let desc = self.connectors.get_mut(&connector_id).unwrap(); - debug_assert!(desc.in_sync); - debug_assert!(!desc.spec_branches_active.is_empty()); - - let branch_index = desc.spec_branches_active.pop_front().unwrap(); - let branch = &mut desc.branches[branch_index as usize]; - debug_assert_eq!(branch_index, branch.index); - - // Run this particular branch to a next blocking point - let mut run_context = Context{ - inbox: &branch.message_inbox, - port_mapping: &branch.port_mapping, - branch_ctx: &mut branch.branch_context, - }; - - let run_result = branch.code_state.run(&mut run_context, &self.protocol); - - match run_result { - RunResult::BranchInconsistent => { - // Speculative branch became inconsistent. So we don't - // run it again - branch.branch_state = BranchState::Failed; - }, - RunResult::BranchMissingPortState(port_id) => { - // Branch called `fires()` on a port that did not have a - // value assigned yet. So branch and keep running. - debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - debug_assert!(branch.port_mapping.get(&port_id).is_none()); - - let mut copied_branch = Self::duplicate_branch(desc, branch_index); - let copied_index = copied_branch.index; - - copied_branch.port_mapping.insert(port_id, BranchPortDesc{ - last_registered_index: None, - num_times_fired: 1, - }); - - let branch = &mut desc.branches[branch_index as usize]; // need to reborrow - branch.port_mapping.insert(port_id, BranchPortDesc{ - last_registered_index: None, - num_times_fired: 0, - }); - - // Run both again - desc.branches.push(copied_branch); - desc.spec_branches_active.push_back(branch_index); - desc.spec_branches_active.push_back(copied_index); - - return Scheduling::Immediate; - }, - RunResult::BranchMissingPortValue(port_id) => { - // Branch just performed a `get()` on a port that did - // not yet receive a value. - - // First check if a port value is assigned to the - // current branch. If so, check if it is consistent. - debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - let mut insert_in_pending_receive = false; - - match branch.port_mapping.entry(port_id) { - Entry::Vacant(entry) => { - // No entry yet, so force to firing - entry.insert(BranchPortDesc{ - last_registered_index: None, - num_times_fired: 1, - }); - branch.branch_state = BranchState::BranchPoint; - insert_in_pending_receive = true; - }, - Entry::Occupied(entry) => { - // Have an entry, check if it is consistent - let entry = entry.get(); - if entry.num_times_fired == 0 { - // Inconsistent - branch.branch_state = BranchState::Failed; - } else { - // Perfectly fine, add to queue - debug_assert!(entry.last_registered_index.is_none()); - assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now"); - branch.branch_state = BranchState::BranchPoint; - insert_in_pending_receive = true; - } - } - } - - if insert_in_pending_receive { - // Perform the insert - match desc.spec_branches_pending_receive.entry(port_id) { - Entry::Vacant(entry) => { - entry.insert(vec![branch_index]); - } - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - debug_assert!(!entry.contains(&branch_index)); - entry.push(branch_index); - } - } - - // But also check immediately if we don't have a - // previously received message. If so, we - // immediately branch and accept the message - if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) { - for message in messages { - let mut new_branch = Self::duplicate_branch(desc, branch_index); - let new_branch_idx = new_branch.index; - let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap(); - new_port_desc.last_registered_index = Some(message.peer_cur_branch_id); - new_branch.message_inbox.insert((port_id, 1), message.message.clone()); - - desc.branches.push(new_branch); - desc.spec_branches_active.push_back(new_branch_idx); - } - - if !messages.is_empty() { - return Scheduling::Immediate; - } - } - } - }, - RunResult::BranchAtSyncEnd => { - // Check the branch for any ports that were not used and - // insert them in the port mapping as not having fired. - for port_id in branch.owned_ports.iter().copied() { - let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_id }); - if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) { - entry.insert(BranchPortDesc { - last_registered_index: None, - num_times_fired: 0 - }); - } - } - - // Mark the branch as being done - branch.branch_state = BranchState::ReachedEndSync; - desc.spec_branches_done.push(branch_index); - }, - RunResult::BranchPut(port_id, value_group) => { - debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - debug_assert_eq!(value_group.values.len(), 1); // can only send one value - - // Branch just performed a `put()`. Check if we have - // assigned the port value and if so, if it is - // consistent. - let mut can_put = true; - branch.branch_context.just_called_did_put = true; - match branch.port_mapping.entry(port_id) { - Entry::Vacant(entry) => { - // No entry yet - entry.insert(BranchPortDesc{ - last_registered_index: Some(branch.index), - num_times_fired: 1, - }); - }, - Entry::Occupied(mut entry) => { - // Pre-existing entry - let entry = entry.get_mut(); - if entry.num_times_fired == 0 { - // This is 'fine' in the sense that we have - // a normal inconsistency in the branch. - branch.branch_state = BranchState::Failed; - can_put = false; - } else if entry.last_registered_index.is_none() { - // A put() that follows a fires() - entry.last_registered_index = Some(branch.index); - } else { - // This should be fine in the future. But - // for now we throw an error as it doesn't - // mesh well with the 'fires()' concept. - todo!("throw an error of some sort, then fail all related") - } - } - } - - if can_put { - // Actually put the message in the outbox - let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap(); - debug_assert_eq!(port_desc.owning_connector_id.unwrap(), connector_id); - let peer_id = port_desc.peer_id; - let peer_desc = self.ports.get(&peer_id).unwrap(); - debug_assert!(peer_desc.owning_connector_id.is_some()); - - let peer_id = PortId(Id{ - connector_id: peer_desc.owning_connector_id.unwrap(), - u32_suffix: peer_id - }); - - // For now this is the one and only time we're going - // to send a message. So for now we can't send a - // branch ID. - desc.global_outbox.insert_message(BufferedMessage{ - sending_port: port_id, - receiving_port: peer_id, - peer_prev_branch_id: None, - peer_cur_branch_id: branch_index, - message: value_group, - }); - - // Finally, because we were able to put the message, - // we can run the branch again - desc.spec_branches_active.push_back(branch_index); - return Scheduling::Immediate; - } - }, - _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), - } - - // Did not return that we need to immediately schedule again, so - // determine if we want to do so based on the current number of active - // speculative branches - if desc.spec_branches_active.is_empty() { - return Scheduling::NotNow; - } else { - return Scheduling::Later; - } - } - - #[inline] - fn run_connector_regular_mode(&mut self, connector_id: u32) -> Scheduling { - // Retrieve the connector and the branch (which is always the first one, - // since we assume we're not running in sync-mode). - let desc = self.connectors.get_mut(&connector_id).unwrap(); - debug_assert!(!desc.in_sync); - debug_assert!(desc.spec_branches_active.is_empty()); - debug_assert_eq!(desc.branches.len(), 1); - - let branch = &mut desc.branches[0]; - - // Run this branch to its blocking point - let mut run_context = Context{ - inbox: &branch.message_inbox, - port_mapping: &branch.port_mapping, - branch_ctx: &mut branch.branch_context, - }; - let run_result = branch.code_state.run(&mut run_context, &self.protocol); - - match run_result { - RunResult::ComponentTerminated => { - branch.branch_state = BranchState::Finished; - return Scheduling::NotNow - }, - RunResult::ComponentAtSyncStart => { - // Prepare for sync execution - Self::prepare_branch_for_sync(desc); - return Scheduling::Immediate; - }, - RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { - // Find all references to ports in the provided arguments, the - // ownership of these ports will be transferred to the connector - // we're about to create. - let mut ports = Vec::with_capacity(arguments.values.len()); - find_ports_in_value_group(&arguments, &mut ports); - - // Generate a new connector with its own state - let new_component_id = self.generate_connector_id(); - let new_component_state = ComponentState { - prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments) - }; - - for port_id in &ports { - let port = self.ports.get_mut(&port_id.0.u32_suffix).unwrap(); - debug_assert_eq!(port.owning_connector_id.unwrap(), connector_id); - port.owning_connector_id = Some(new_component_id) - } - - // Finally push the new connector into the registry - let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); - self.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports)); - self.connectors_active.push_back(new_component_id); - - return Scheduling::Immediate; - }, - RunResult::NewChannel => { - // Prepare channel - debug_assert!(run_context.branch_ctx.pending_channel.is_none()); - let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, Some(connector_id)); - run_context.branch_ctx.pending_channel = Some(( - port_value_from_id(Some(connector_id), put_id, true), - port_value_from_id(Some(connector_id), get_id, false) - )); - - return Scheduling::Immediate; - }, - _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result), - } - } - - /// Puts all the messages that are currently in the outbox of a particular - /// connector into the inbox of the receivers. If possible then branches - /// will be created that receive those messages. - fn empty_connector_outbox(&mut self, connector_index: u32) { - loop { - let connector = self.connectors.get_mut(&connector_index).unwrap(); - let message_to_send = connector.global_outbox.take_next_message_to_send(); - - if message_to_send.is_none() { - return; - } - - // We have a message to send - let message_to_send = message_to_send.unwrap(); - - // Lookup the target connector - let target_port = message_to_send.receiving_port; - let port_desc = self.ports.get(&target_port.0.u32_suffix).unwrap(); - debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id); - let target_connector_id = port_desc.owning_connector_id.unwrap(); - let target_connector = self.connectors.get_mut(&target_connector_id).unwrap(); - - // In any case, always put the message in the global inbox - target_connector.global_inbox.insert_message(message_to_send.clone()); - - // Check if there are any branches that are waiting on - // receives - if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) { - // Check each of the branches for a port mapping that - // matches the one on the message header - for branch_index in branch_indices { - let branch = &mut target_connector.branches[*branch_index as usize]; - debug_assert_eq!(branch.branch_state, BranchState::BranchPoint); - - let mut can_branch = false; - - if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) { - if port_desc.last_registered_index == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 { - can_branch = true; - } - } - - if can_branch { - // Put the message inside a clone of the currently - // waiting branch - let mut new_branch = Self::duplicate_branch(target_connector, *branch_index); - let new_branch_idx = new_branch.index; - let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap(); - new_port_desc.last_registered_index = Some(message_to_send.peer_cur_branch_id); - new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone()); - - // And queue the branch for further execution - target_connector.branches.push(new_branch); - target_connector.spec_branches_active.push_back(new_branch_idx); - if !self.connectors_active.contains(&target_connector.id) { - self.connectors_active.push_back(target_connector.id); - } - } - } - } - } - } - - /// Checks a connector for the submitted solutions. After all neighbouring - /// connectors have been checked all of their "last checked solution" index - /// will be incremented. - fn check_connector_new_solutions(&mut self, connector_id: u32) { - // Take connector and start processing its solutions - loop { - let connector = self.connectors.get_mut(&connector_id).unwrap(); - if connector.last_checked_done == connector.spec_branches_done.len() as u32 { - // Nothing to do - return; - } - - // We have a new solution - let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize]; - connector.last_checked_done += 1; - - // Check the connector+branch combination to see if a global - // solution has already been found - if let Some(global_solution) = self.check_connector_solution(connector_id, start_branch_index) { - // Found a global solution, apply it to all the connectors that - // participate - for (connector_id, local_solution) in global_solution.connector_mapping { - self.commit_connector_solution(connector_id, local_solution.final_branch_id); - } - } - } - } - - fn check_connector_solution(&mut self, first_connector_index: u32, first_branch_index: u32) -> Option { - // Take the connector and branch of interest - let first_connector = self.connectors.get(&first_connector_index).unwrap(); - let first_branch = &first_connector.branches[first_branch_index as usize]; - debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync); - - // Setup the first solution - let mut first_solution = ProposedSolution{ - connector_mapping: HashMap::new(), - connector_constraints: HashMap::new(), - remaining_connectors: Vec::new(), - }; - let mut first_local_solution = ProposedConnectorSolution{ - final_branch_id: first_branch.index, - all_branch_ids: Vec::new(), - port_mapping: first_branch.port_mapping - .iter() - .map(|(port_id, port_info)| { - (port_id.0.u32_suffix, port_info.last_registered_index) - }) - .collect(), - }; - self.determine_branch_ids(first_connector, first_branch.index, &mut first_local_solution.all_branch_ids); - first_solution.connector_mapping.insert(first_connector.id, first_local_solution); - - for (port_id, port_mapping) in first_branch.port_mapping.iter() { - let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap(); - let peer_port_id = port_desc.peer_id; - let peer_port_desc = self.ports.get(&peer_port_id).unwrap(); - let peer_connector_id = peer_port_desc.owning_connector_id.unwrap(); - - let constraint = match port_mapping.last_registered_index { - Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id), - None => ProposedBranchConstraint::SilentPort(peer_port_id), - }; - - match first_solution.connector_constraints.entry(peer_connector_id) { - Entry::Vacant(entry) => { - // Not yet encountered - entry.insert(vec![constraint]); - first_solution.remaining_connectors.push(peer_connector_id); - }, - Entry::Occupied(mut entry) => { - // Already encountered - let entry = entry.get_mut(); - if !entry.contains(&constraint) { - entry.push(constraint); - } - } - } - } - - // Setup storage for all possible solutions - let mut all_solutions = Vec::new(); - all_solutions.push(first_solution); - - while !all_solutions.is_empty() { - let mut cur_solution = all_solutions.pop().unwrap(); - - if cur_solution.remaining_connectors.is_empty() { - // All connectors have been visited, so commit the solution - debug_assert!(cur_solution.connector_constraints.is_empty()); - return Some(cur_solution); - } else { - // Not all connectors have been visited yet, so take one of the - // connectors and visit it. - let target_connector = cur_solution.remaining_connectors.pop().unwrap(); - self.merge_solution_with_connector(&mut cur_solution, &mut all_solutions, target_connector); - } - } - - // No satisfying solution found - return None; - } - - fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec, target_connector: u32) { - debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited - debug_assert!(cur_solution.connector_constraints.contains_key(&target_connector)); // but we encountered a reference to it - - let branch_constraints = cur_solution.connector_constraints.get(&target_connector).unwrap(); - let cur_connector = self.connectors.get(&target_connector).unwrap(); - - // Make sure all propositions are unique - for i in 0..branch_constraints.len() { - let proposition_i = branch_constraints[i]; - for j in 0..i { - let proposition_j = branch_constraints[j]; - debug_assert_ne!(proposition_i, proposition_j); - } - } - - // Go through the current connector's branches that have finished - 'branch_loop: for finished_branch_idx in cur_connector.spec_branches_done.iter().copied() { - let finished_branch = &cur_connector.branches[finished_branch_idx as usize]; - - // Construct a list of all the parent branch numbers - let mut parent_branch_ids = Vec::new(); - self.determine_branch_ids(cur_connector, finished_branch_idx, &mut parent_branch_ids); - - // Go through all constraints and make sure they are satisfied by - // the current branch - let mut all_constraints_satisfied = true; - - for constraint in branch_constraints { - match constraint { - ProposedBranchConstraint::SilentPort(port_id) => { - // Specified should have remained silent - let port_id = PortId(Id{ - connector_id: target_connector, - u32_suffix: *port_id, - }); - debug_assert!(finished_branch.port_mapping.contains_key(&port_id)); - let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap(); - all_constraints_satisfied = all_constraints_satisfied && mapped_port.num_times_fired == 0; - }, - ProposedBranchConstraint::BranchNumber(branch_id) => { - // Branch number should have appeared in the - // predecessor branches. - all_constraints_satisfied = all_constraints_satisfied && parent_branch_ids.contains(branch_id); - }, - ProposedBranchConstraint::PortMapping(port_id, branch_id) => { - // Port should map to a particular branch number - let port_id = PortId(Id{ - connector_id: target_connector, - u32_suffix: *port_id, - }); - debug_assert!(finished_branch.port_mapping.contains_key(&port_id)); - let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap(); - all_constraints_satisfied = all_constraints_satisfied && mapped_port.last_registered_index == Some(*branch_id); - } - } - - if !all_constraints_satisfied { - break; - } - } - - if !all_constraints_satisfied { - continue; - } - - // If here, then all constraints on the finished branch are - // satisfied. But the finished branch also puts constraints on the - // other connectors. So either: - // 1. Add them to the list of constraints a peer connector should - // adhere to. - // 2. Make sure that the provided connector solution matches the - // constraints imposed by the currently considered finished branch - // - // To make our lives a bit easier we already insert our proposed - // local solution into a prepared global solution. This makes - // looking up remote ports easier (since the channel might have its - // two ends owned by the same connector). - let mut new_solution = cur_solution.clone(); - debug_assert!(!new_solution.remaining_connectors.contains(&target_connector)); - new_solution.connector_constraints.remove(&target_connector); - new_solution.connector_mapping.insert(target_connector, ProposedConnectorSolution{ - final_branch_id: finished_branch.index, - all_branch_ids: parent_branch_ids, - port_mapping: finished_branch.port_mapping - .iter() - .map(|(port_id, port_desc)| { - (port_id.0.u32_suffix, port_desc.last_registered_index) - }) - .collect(), - }); - - for (local_port_id, port_desc) in &finished_branch.port_mapping { - // Retrieve port of peer - let port_info = self.ports.get(&local_port_id.0.u32_suffix).unwrap(); - let peer_port_id = port_info.peer_id; - let peer_port_info = self.ports.get(&peer_port_id).unwrap(); - let peer_connector_id = peer_port_info.owning_connector_id.unwrap(); - - // If the connector was not present in the new global solution - // yet, add it now, as it simplifies the following logic - if !new_solution.connector_mapping.contains_key(&peer_connector_id) && !new_solution.remaining_connectors.contains(&peer_connector_id) { - new_solution.connector_constraints.insert(peer_connector_id, Vec::new()); - new_solution.remaining_connectors.push(peer_connector_id); - } - - if new_solution.remaining_connectors.contains(&peer_connector_id) { - // Constraint applies to a connector that has not yet been - // visited - debug_assert!(new_solution.connector_constraints.contains_key(&peer_connector_id)); - debug_assert_ne!(peer_connector_id, target_connector); - - let new_constraint = if port_desc.num_times_fired == 0 { - ProposedBranchConstraint::SilentPort(peer_port_id) - } else if peer_port_info.is_getter { - // Peer port is a getter, so we want its port to map to - // the branch number in our port mapping. - debug_assert!(port_desc.last_registered_index.is_some()); - ProposedBranchConstraint::PortMapping(peer_port_id, port_desc.last_registered_index.unwrap()) - } else { - // Peer port is a putter, so we want to restrict the - // solution's run to contain the branch ID we received. - ProposedBranchConstraint::BranchNumber(port_desc.last_registered_index.unwrap()) - }; - - let peer_constraints = new_solution.connector_constraints.get_mut(&peer_connector_id).unwrap(); - if !peer_constraints.contains(&new_constraint) { - peer_constraints.push(new_constraint); - } - } else { - // Constraint applies to an already visited connector - let peer_solution = new_solution.connector_mapping.get(&peer_connector_id).unwrap(); - if port_desc.num_times_fired == 0 { - let peer_mapped_id = peer_solution.port_mapping.get(&peer_port_id).unwrap(); - if peer_mapped_id.is_some() { - all_constraints_satisfied = false; - break; - } - } else if peer_port_info.is_getter { - // Peer is getter, so its port should be mapped to one - // of our branch IDs. To simplify lookup we look at the - // last message we sent to the getter. - debug_assert!(port_desc.last_registered_index.is_some()); - let peer_port = peer_solution.port_mapping.get(&peer_port_id) - .map_or(None, |v| *v); - - if port_desc.last_registered_index != peer_port { - // No match - all_constraints_satisfied = false; - break; - } - } else { - // Peer is putter, so we expect to find our port mapping - // to match one of the branch numbers in the peer - // connector's local solution - debug_assert!(port_desc.last_registered_index.is_some()); - let expected_branch_id = port_desc.last_registered_index.unwrap(); - - if !peer_solution.all_branch_ids.contains(&expected_branch_id) { - all_constraints_satisfied = false; - break; - } - } - } - } - - if !all_constraints_satisfied { - // Final checks failed - continue 'branch_loop - } - - // We're sure that this branch matches the provided solution, so - // push it onto the list of considered solutions - all_solutions.push(new_solution); - } - } - - fn commit_connector_solution(&mut self, connector_id: u32, branch_id: u32) { - // Retrieve connector and branch - let connector = self.connectors.get_mut(&connector_id).unwrap(); - debug_assert_ne!(branch_id, 0); // because at 0 we have our initial backed-up non-sync branch - debug_assert!(connector.in_sync); - debug_assert!(connector.spec_branches_done.contains(&branch_id)); - - // Put the selected solution in front, the branch at index 0 is the - // "non-sync" branch. - connector.branches.swap(0, branch_id as usize); - connector.branches.truncate(1); - - // And reset the connector's state for further execution - connector.in_sync = false; - connector.spec_branches_active.clear(); - connector.spec_branches_pending_receive.clear(); - connector.spec_branches_done.clear(); - connector.last_checked_done = 0; - connector.global_inbox.clear(); - connector.global_outbox.clear(); - - // Do the same thing for the final selected branch - let final_branch = &mut connector.branches[0]; - final_branch.index = 0; - final_branch.parent_index = None; - debug_assert_eq!(final_branch.branch_state, BranchState::ReachedEndSync); - final_branch.branch_state = BranchState::RunningNonSync; - final_branch.message_inbox.clear(); - final_branch.port_mapping.clear(); - - // Might be that the connector was no longer running, if so, put it back - // in the list of connectors to run - if !self.connectors_active.contains(&connector_id) { - self.connectors_active.push_back(connector_id); - } - } - - fn generate_connector_id(&mut self) -> u32 { - let id = self.connector_counter; - self.connector_counter += 1; - return id; - } - - // ------------------------------------------------------------------------- - // Helpers for port management - // ------------------------------------------------------------------------- - - #[inline] - fn add_owned_channel(ports: &mut HashMap, port_counter: &mut u32, owning_connector_id: Option) -> (u32, u32) { - let get_id = *port_counter; - let put_id = *port_counter + 1; - (*port_counter) += 2; - - ports.insert(get_id, PortDesc{ - id: get_id, - peer_id: put_id, - owning_connector_id, - is_getter: true, - }); - ports.insert(put_id, PortDesc{ - id: put_id, - peer_id: get_id, - owning_connector_id, - is_getter: false, - }); - - return (put_id, get_id); - } - - // ------------------------------------------------------------------------- - // Helpers for branch management - // ------------------------------------------------------------------------- - - /// Prepares a speculative branch for further execution from the connector's - /// non-speculative base branch. - fn prepare_branch_for_sync(desc: &mut ConnectorDesc) { - // Ensure only one branch is active, the non-sync branch - debug_assert!(!desc.in_sync); - debug_assert_eq!(desc.branches.len(), 1); - debug_assert!(desc.spec_branches_active.is_empty()); - let new_branch_index = 1; - - // Push first speculative branch as active branch - let new_branch = BranchDesc::new_sync_from(new_branch_index, &desc.branches[0]); - desc.branches.push(new_branch); - desc.spec_branches_active.push_back(new_branch_index); - desc.in_sync = true; - } - - /// Duplicates a particular (speculative) branch and returns it. Due to - /// borrowing rules in code that uses this helper the returned branch still - /// needs to be pushed onto the member `branches`. - fn duplicate_branch(desc: &ConnectorDesc, original_branch_idx: u32) -> BranchDesc { - let original_branch = &desc.branches[original_branch_idx as usize]; - debug_assert!(desc.in_sync); - - let copied_index = desc.branches.len() as u32; - let copied_branch = BranchDesc::new_sync_from(copied_index, original_branch); - - return copied_branch; - } - - /// Retrieves all parent IDs of a particular branch. These numbers run from - /// the leaf towards the parent. - fn determine_branch_ids(&self, desc: &ConnectorDesc, first_branch_index: u32, result: &mut Vec) { - let mut next_branch_index = first_branch_index; - result.clear(); - - loop { - result.push(next_branch_index); - let branch = &desc.branches[next_branch_index as usize]; - - match branch.parent_index { - Some(index) => next_branch_index = index, - None => return, - } - } - } -} - -/// Context accessible by the code while being executed by the runtime. When the -/// code is being executed by the runtime it sometimes needs to interact with -/// the runtime. This is achieved by the "code throwing an error code", after -/// which the runtime modifies the appropriate variables and continues executing -/// the code again. -struct Context<'a> { - // Temporary references to branch related storage - inbox: &'a HashMap<(PortId, u32), ValueGroup>, - port_mapping: &'a HashMap, - branch_ctx: &'a mut BranchContext, -} - -impl<'a> crate::protocol::RunContext for Context<'a> { - fn did_put(&mut self, _port: PortId) -> bool { - // Note that we want "did put" to return false if we have fired zero - // times, because this implies we did a prevous - let old_value = self.branch_ctx.just_called_did_put; - self.branch_ctx.just_called_did_put = false; - return old_value; - } - - fn get(&mut self, port: PortId) -> Option { - let inbox_key = (port, 1); - match self.inbox.get(&inbox_key) { - None => None, - Some(value) => Some(value.clone()), - } - } - - fn fires(&mut self, port: PortId) -> Option { - match self.port_mapping.get(&port) { - None => None, - Some(port_info) => Some(Value::Bool(port_info.num_times_fired != 0)), - } - } - - fn get_channel(&mut self) -> Option<(Value, Value)> { - self.branch_ctx.pending_channel.take() - } -} - -/// Recursively goes through the value group, attempting to find ports. -/// Duplicates will only be added once. -fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { - // Helper to check a value for a port and recurse if needed. - fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { - match value { - Value::Input(port_id) | Value::Output(port_id) => { - // This is an actual port - for prev_port in ports.iter() { - if prev_port == port_id { - // Already added - return; - } - } - - ports.push(*port_id); - }, - Value::Array(heap_pos) | - Value::Message(heap_pos) | - Value::String(heap_pos) | - Value::Struct(heap_pos) | - Value::Union(_, heap_pos) => { - // Reference to some dynamic thing which might contain ports, - // so recurse - let heap_region = &group.regions[*heap_pos as usize]; - for embedded_value in heap_region { - find_port_in_value(group, embedded_value, ports); - } - }, - _ => {}, // values we don't care about - } - } - - // Clear the ports, then scan all the available values - ports.clear(); - for value in &value_group.values { - find_port_in_value(value_group, value, ports); - } -} - -fn port_value_from_id(connector_id: Option, port_id: u32, is_output: bool) -> Value { - let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system - if is_output { - return Value::Output(PortId(Id{ - connector_id, - u32_suffix: port_id - })); - } else { - return Value::Input(PortId(Id{ - connector_id, - u32_suffix: port_id, - })); - } -} \ No newline at end of file diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index adf1a5be8bd2bca197139e5483975ede611e42e7..2a7873e8371a94bc7fc194b93f75d5cdeee02729 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,14 +1,13 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; -use crate::runtime2::inbox2::ControlContent; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; use super::native::Connector; use super::branch::{BranchId}; -use super::connector2::{ConnectorPDL, ConnectorScheduling}; -use super::inbox2::{MessageFancy, DataMessageFancy, ControlMessageFancy}; +use super::connector::{ConnectorPDL, ConnectorScheduling}; +use super::inbox::{Message, DataMessage, ControlMessage, ControlContent}; // Because it contains pointers we're going to do a copy by value on this one #[derive(Clone, Copy)] @@ -106,7 +105,7 @@ impl Scheduler { connector_id ); self.debug_conn(connector_id, &format!("Sending message [ exit ] \n --- {:?}", message)); - self.runtime.send_message(port.peer_connector, MessageFancy::Control(message)); + self.runtime.send_message(port.peer_connector, Message::Control(message)); } } @@ -142,7 +141,7 @@ impl Scheduler { self.debug_conn(connector_id, " ... Handling the message"); match message { - MessageFancy::Control(message) => { + Message::Control(message) => { match message.content { ControlContent::PortPeerChanged(port_id, new_target_connector_id) => { // Need to change port target @@ -156,7 +155,7 @@ impl Scheduler { debug_assert!(scheduled.ctx_fancy.outbox.is_empty()); // And respond with an Ack - let ack_message = MessageFancy::Control(ControlMessageFancy{ + let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, @@ -170,7 +169,7 @@ impl Scheduler { port.state = PortState::Closed; // Send an Ack - let ack_message = MessageFancy::Control(ControlMessageFancy{ + let ack_message = Message::Control(ControlMessage { id: message.id, sending_component_id: connector_id, content: ControlContent::Ack, @@ -204,7 +203,7 @@ impl Scheduler { self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); let target_component_id = match &message { - MessageFancy::Data(content) => { + Message::Data(content) => { // Data messages are always sent to a particular port, and // may end up being rerouted. let port_desc = scheduled.ctx_fancy.get_port_by_id(content.data_header.sending_port).unwrap(); @@ -216,14 +215,14 @@ impl Scheduler { port_desc.peer_connector }, - MessageFancy::Sync(content) => { + Message::Sync(content) => { // Sync messages are always sent to a particular component, // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). content.target_component_id }, - MessageFancy::Control(_) => { + Message::Control(_) => { unreachable!("component sending control messages directly"); } }; @@ -269,7 +268,7 @@ impl Scheduler { ); self.debug_conn(connector_id, &format!("Sending message [newcon]\n --- {:?}", reroute_message)); - self.runtime.send_message(port.peer_connector, MessageFancy::Control(reroute_message)); + self.runtime.send_message(port.peer_connector, Message::Control(reroute_message)); } // Schedule new connector to run @@ -330,11 +329,11 @@ impl Scheduler { } #[inline] - fn get_message_target_port(message: &MessageFancy) -> Option { + fn get_message_target_port(message: &Message) -> Option { match message { - MessageFancy::Data(data) => return Some(data.data_header.target_port), - MessageFancy::Sync(_) => {}, - MessageFancy::Control(control) => { + Message::Data(data) => return Some(data.data_header.target_port), + Message::Sync(_) => {}, + Message::Control(control) => { match &control.content { ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), ControlContent::CloseChannel(port_id) => return Some(*port_id), @@ -377,16 +376,16 @@ pub(crate) struct ComponentPortChange { /// of it by the component itself. When the component starts a sync block or /// exits a sync block the partially managed state by both component and /// scheduler need to be exchanged. -pub(crate) struct ComponentCtxFancy { +pub(crate) struct ComponentCtx { // Mostly managed by the scheduler pub(crate) id: ConnectorId, ports: Vec, - inbox_messages: Vec, // never control or ping messages + inbox_messages: Vec, // never control or ping messages inbox_len_read: usize, // Submitted by the component is_in_sync: bool, changed_in_sync: bool, - outbox: VecDeque, + outbox: VecDeque, state_changes: VecDeque, // Workspaces that may be used by components to (generally) prevent // allocations. Be a good scout and leave it empty after you've used it. @@ -395,7 +394,7 @@ pub(crate) struct ComponentCtxFancy { pub workspace_branches: Vec, } -impl ComponentCtxFancy { +impl ComponentCtx { pub(crate) fn new_empty() -> Self { return Self{ id: ConnectorId::new_invalid(), @@ -457,7 +456,7 @@ impl ComponentCtxFancy { /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. - pub(crate) fn submit_message(&mut self, contents: MessageFancy) { + pub(crate) fn submit_message(&mut self, contents: Message) { debug_assert!(self.is_in_sync); self.outbox.push_back(contents); } @@ -491,7 +490,7 @@ impl ComponentCtxFancy { /// Retrieves the next unread message from the inbox `None` if there are no /// (new) messages to read. // TODO: Fix the clone of the data message, entirely unnecessary - pub(crate) fn read_next_message(&mut self) -> Option { + pub(crate) fn read_next_message(&mut self) -> Option { if !self.is_in_sync { return None; } if self.inbox_len_read == self.inbox_messages.len() { return None; } @@ -501,34 +500,34 @@ impl ComponentCtxFancy { // here. let message = &self.inbox_messages[self.inbox_len_read]; match message { - MessageFancy::Data(content) => { + Message::Data(content) => { self.inbox_len_read += 1; - return Some(MessageFancy::Data(content.clone())); + return Some(Message::Data(content.clone())); }, - MessageFancy::Sync(_) => { + Message::Sync(_) => { let message = self.inbox_messages.remove(self.inbox_len_read); return Some(message); }, - MessageFancy::Control(_) => unreachable!("control message ended up in component inbox"), + Message::Control(_) => unreachable!("control message ended up in component inbox"), } } } pub(crate) struct MessagesIter<'a> { - messages: &'a [MessageFancy], + messages: &'a [Message], next_index: usize, max_index: usize, match_port_id: PortIdLocal, } impl<'a> Iterator for MessagesIter<'a> { - type Item = &'a DataMessageFancy; + type Item = &'a DataMessage; fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let message = &self.messages[self.next_index]; - if let MessageFancy::Data(message) = &message { + if let Message::Data(message) = &message { if message.data_header.target_port == self.match_port_id { // Found a match self.next_index += 1; @@ -593,7 +592,7 @@ impl ControlMessageHandler { pub fn prepare_closing_channel( &mut self, self_port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId - ) -> ControlMessageFancy { + ) -> ControlMessage { let id = self.take_id(); self.active.push(ControlEntry{ @@ -604,7 +603,7 @@ impl ControlMessageHandler { }), }); - return ControlMessageFancy{ + return ControlMessage { id, sending_component_id: self_connector_id, content: ControlContent::CloseChannel(peer_port_id), @@ -619,7 +618,7 @@ impl ControlMessageHandler { port_id: PortIdLocal, peer_port_id: PortIdLocal, self_connector_id: ConnectorId, peer_connector_id: ConnectorId, new_owner_connector_id: ConnectorId - ) -> ControlMessageFancy { + ) -> ControlMessage { let id = self.take_id(); self.active.push(ControlEntry{ @@ -631,7 +630,7 @@ impl ControlMessageHandler { }), }); - return ControlMessageFancy{ + return ControlMessage { id, sending_component_id: self_connector_id, content: ControlContent::PortPeerChanged(peer_port_id, new_owner_connector_id), diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 79417ff2af586b2530f86f6610348643eea104e5..eec0105e5de7429babadd59ae496d33561481aa1 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use super::*; use crate::{PortId, ProtocolDescription}; use crate::common::Id;