diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs deleted file mode 100644 index ee19e6b5a17562f26a9d074195fb4bb18462b9d5..0000000000000000000000000000000000000000 --- a/src/runtime2/runtime.rs +++ /dev/null @@ -1,1146 +0,0 @@ -use std::sync::Arc; -use std::collections::{HashMap, VecDeque}; -use std::collections::hash_map::{Entry}; - -use crate::{Polarity, PortId}; -use crate::common::Id; -use crate::protocol::*; -use crate::protocol::eval::*; - -use super::messages::*; - -#[derive(Debug)] -pub enum AddComponentError { - ModuleDoesNotExist, - ConnectorDoesNotExist, - InvalidArgumentType(usize), // value is index of (first) invalid argument -} - -pub(crate) struct PortDesc { - id: u32, - peer_id: u32, - owning_connector_id: Option, - is_getter: bool, // otherwise one can only call `put` -} - -pub(crate) struct ConnectorDesc { - id: u32, - in_sync: bool, - pub(crate) branches: Vec, // first one is always non-speculative one - spec_branches_active: VecDeque, // branches that can be run immediately - spec_branches_pending_receive: HashMap>, // from port_id to branch index - spec_branches_done: Vec, - last_checked_done: u32, - global_inbox: ConnectorInbox, - global_outbox: ConnectorOutbox, -} - -impl ConnectorDesc { - /// Creates a new connector description. Implicit assumption is that there - /// is one (non-sync) branch that can be immediately executed. - fn new(id: u32, component_state: ComponentState, owned_ports: Vec) -> Self { - Self{ - id, - in_sync: false, - branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)], - spec_branches_active: VecDeque::new(), - spec_branches_pending_receive: HashMap::new(), - spec_branches_done: Vec::new(), - last_checked_done: 0, - global_inbox: ConnectorInbox::new(), - global_outbox: ConnectorOutbox::new(), - } - } -} - -#[derive(Debug, PartialEq, Eq)] -pub(crate) enum BranchState { - RunningNonSync, // regular running non-speculative branch - RunningSync, // regular running speculative branch - BranchPoint, // branch which ended up being a branching point - ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution - Failed, // branch that became inconsistent - Finished, // branch (necessarily non-sync) that reached end of code -} - -#[derive(Debug, Clone)] -struct BranchPortDesc { - last_registered_index: Option, // if putter, then last sent branch ID, if getter, then last received branch ID - num_times_fired: u32, // number of puts/gets on this port -} - -struct BranchContext { - just_called_did_put: bool, - pending_channel: Option<(Value, Value)>, -} - -pub(crate) struct BranchDesc { - index: u32, - parent_index: Option, - pub(crate) code_state: ComponentState, - pub(crate) branch_state: BranchState, - owned_ports: Vec, - message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value - port_mapping: HashMap, - branch_context: BranchContext, -} - -impl BranchDesc { - /// Creates the first non-sync branch of a connector - fn new_non_sync(component_state: ComponentState, owned_ports: Vec) -> Self { - Self{ - index: 0, - parent_index: None, - code_state: component_state, - branch_state: BranchState::RunningNonSync, - owned_ports, - message_inbox: HashMap::new(), - port_mapping: HashMap::new(), - branch_context: BranchContext{ - just_called_did_put: false, - pending_channel: None, - } - } - } - - /// Creates a sync branch based on the supplied branch. This supplied branch - /// is the branching point for the new one, i.e. the parent in the branching - /// tree. - fn new_sync_from(index: u32, branch_state: &BranchDesc) -> Self { - // We expect that the given branche's context is not halfway handling a - // `put(...)` or a `channel x -> y` statement. - debug_assert!(!branch_state.branch_context.just_called_did_put); - debug_assert!(branch_state.branch_context.pending_channel.is_none()); - - Self{ - index, - parent_index: Some(branch_state.index), - code_state: branch_state.code_state.clone(), - branch_state: BranchState::RunningSync, - owned_ports: branch_state.owned_ports.clone(), - message_inbox: branch_state.message_inbox.clone(), - port_mapping: branch_state.port_mapping.clone(), - branch_context: BranchContext{ - just_called_did_put: false, - pending_channel: None, - } - } - } -} - -#[derive(PartialEq, Eq)] -enum Scheduling { - Immediate, - Later, - NotNow, -} - -#[derive(Clone, Copy, Eq, PartialEq, Debug)] -enum ProposedBranchConstraint { - SilentPort(u32), // port id - BranchNumber(u32), // branch id - PortMapping(u32, u32), // particular port's mapped branch number -} - -// Local solution of the connector -#[derive(Clone)] -struct ProposedConnectorSolution { - final_branch_id: u32, - all_branch_ids: Vec, // the final branch ID and, recursively, all parents - port_mapping: HashMap>, // port IDs of the connector, mapped to their branch IDs (None for silent ports) -} - -#[derive(Clone)] -struct ProposedSolution { - connector_mapping: HashMap, // from connector ID to branch ID - connector_constraints: HashMap>, // from connector ID to encountered branch numbers - remaining_connectors: Vec, // connectors that still need to be visited -} - -// TODO: @performance, use freelists+ids instead of HashMaps -pub struct Runtime { - protocol: Arc, - pub(crate) ports: HashMap, - port_counter: u32, - pub(crate) connectors: HashMap, - connector_counter: u32, - connectors_active: VecDeque, -} - -impl Runtime { - pub fn new(pd: Arc) -> Self { - Self{ - protocol: pd, - ports: HashMap::new(), - port_counter: 0, - connectors: HashMap::new(), - connector_counter: 0, - connectors_active: VecDeque::new(), - } - } - - /// Creates a new channel that is not owned by any connector and returns its - /// endpoints. The returned values are of the (putter port, getter port) - /// respectively. - pub fn add_channel(&mut self) -> (Value, Value) { - let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, None); - return ( - port_value_from_id(None, put_id, true), - port_value_from_id(None, get_id, false) - ); - } - - pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> { - use AddComponentError as ACE; - use crate::runtime::error::AddComponentError as OldACE; - - // TODO: Remove the responsibility of adding a component from the PD - - // Lookup module and the component - // TODO: Remove this error enum translation. Note that for now this - // function forces port-only arguments - let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) { - Ok(polarities) => polarities, - Err(reason) => match reason { - OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)), - OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist), - OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist), - _ => unreachable!(), - } - }; - - // Make sure supplied values (and types) are correct. At the same time - // modify the port IDs such that they contain the ID of the connector - // we're about the create. - let component_id = self.generate_connector_id(); - let mut ports = Vec::with_capacity(values.values.len()); - - for (value_idx, value) in values.values.iter().enumerate() { - let polarity = &port_polarities[value_idx]; - - match value { - Value::Input(port_id) => { - if *polarity != Polarity::Getter { - return Err(ACE::InvalidArgumentType(value_idx)) - } - - ports.push(PortId(Id{ - connector_id: component_id, - u32_suffix: port_id.0.u32_suffix, - })); - }, - Value::Output(port_id) => { - if *polarity != Polarity::Putter { - return Err(ACE::InvalidArgumentType(value_idx)) - } - - ports.push(PortId(Id{ - connector_id: component_id, - u32_suffix: port_id.0.u32_suffix - })); - }, - _ => return Err(ACE::InvalidArgumentType(value_idx)) - } - } - - // Instantiate the component, and mark the ports as being owned by the - // newly instantiated component - let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports); - let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); - - for port in &ports { - let desc = self.ports.get_mut(port).unwrap(); - desc.owning_connector_id = Some(component_id); - } - - self.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); - self.connectors_active.push_back(component_id); - - Ok(()) - } - - pub fn run(&mut self) { - // Go through all active connectors - while !self.connectors_active.is_empty() { - // Run a single connector until it indicates we can run another - // connector - let next_id = self.connectors_active.pop_front().unwrap(); - let mut scheduling = Scheduling::Immediate; - - while scheduling == Scheduling::Immediate { - scheduling = self.run_connector(next_id); - } - - match scheduling { - Scheduling::Immediate => unreachable!(), - Scheduling::Later => self.connectors_active.push_back(next_id), - Scheduling::NotNow => {}, - } - - // Deal with any outgoing messages and potential solutions - self.empty_connector_outbox(next_id); - self.check_connector_new_solutions(next_id); - } - } - - /// Runs a connector for as long as sensible, then returns `true` if the - /// connector should be run again in the future, and return `false` if the - /// connector has terminated. Note that a terminated connector still - /// requires cleanup. - fn run_connector(&mut self, connector_id: u32) -> Scheduling { - let desc = self.connectors.get_mut(&connector_id).unwrap(); - - if desc.in_sync { - return self.run_connector_sync_mode(connector_id); - } else { - return self.run_connector_regular_mode(connector_id); - } - } - - #[inline] - fn run_connector_sync_mode(&mut self, connector_id: u32) -> Scheduling { - // Retrieve connector and branch that is supposed to be run - let desc = self.connectors.get_mut(&connector_id).unwrap(); - debug_assert!(desc.in_sync); - debug_assert!(!desc.spec_branches_active.is_empty()); - - let branch_index = desc.spec_branches_active.pop_front().unwrap(); - let branch = &mut desc.branches[branch_index as usize]; - debug_assert_eq!(branch_index, branch.index); - - // Run this particular branch to a next blocking point - let mut run_context = Context{ - inbox: &branch.message_inbox, - port_mapping: &branch.port_mapping, - branch_ctx: &mut branch.branch_context, - }; - - let run_result = branch.code_state.run(&mut run_context, &self.protocol); - - match run_result { - RunResult::BranchInconsistent => { - // Speculative branch became inconsistent. So we don't - // run it again - branch.branch_state = BranchState::Failed; - }, - RunResult::BranchMissingPortState(port_id) => { - // Branch called `fires()` on a port that did not have a - // value assigned yet. So branch and keep running. - debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - debug_assert!(branch.port_mapping.get(&port_id).is_none()); - - let mut copied_branch = Self::duplicate_branch(desc, branch_index); - let copied_index = copied_branch.index; - - copied_branch.port_mapping.insert(port_id, BranchPortDesc{ - last_registered_index: None, - num_times_fired: 1, - }); - - let branch = &mut desc.branches[branch_index as usize]; // need to reborrow - branch.port_mapping.insert(port_id, BranchPortDesc{ - last_registered_index: None, - num_times_fired: 0, - }); - - // Run both again - desc.branches.push(copied_branch); - desc.spec_branches_active.push_back(branch_index); - desc.spec_branches_active.push_back(copied_index); - - return Scheduling::Immediate; - }, - RunResult::BranchMissingPortValue(port_id) => { - // Branch just performed a `get()` on a port that did - // not yet receive a value. - - // First check if a port value is assigned to the - // current branch. If so, check if it is consistent. - debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - let mut insert_in_pending_receive = false; - - match branch.port_mapping.entry(port_id) { - Entry::Vacant(entry) => { - // No entry yet, so force to firing - entry.insert(BranchPortDesc{ - last_registered_index: None, - num_times_fired: 1, - }); - branch.branch_state = BranchState::BranchPoint; - insert_in_pending_receive = true; - }, - Entry::Occupied(entry) => { - // Have an entry, check if it is consistent - let entry = entry.get(); - if entry.num_times_fired == 0 { - // Inconsistent - branch.branch_state = BranchState::Failed; - } else { - // Perfectly fine, add to queue - debug_assert!(entry.last_registered_index.is_none()); - assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now"); - branch.branch_state = BranchState::BranchPoint; - insert_in_pending_receive = true; - } - } - } - - if insert_in_pending_receive { - // Perform the insert - match desc.spec_branches_pending_receive.entry(port_id) { - Entry::Vacant(entry) => { - entry.insert(vec![branch_index]); - } - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - debug_assert!(!entry.contains(&branch_index)); - entry.push(branch_index); - } - } - - // But also check immediately if we don't have a - // previously received message. If so, we - // immediately branch and accept the message - if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) { - for message in messages { - let mut new_branch = Self::duplicate_branch(desc, branch_index); - let new_branch_idx = new_branch.index; - let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap(); - new_port_desc.last_registered_index = Some(message.peer_cur_branch_id); - new_branch.message_inbox.insert((port_id, 1), message.message.clone()); - - desc.branches.push(new_branch); - desc.spec_branches_active.push_back(new_branch_idx); - } - - if !messages.is_empty() { - return Scheduling::Immediate; - } - } - } - }, - RunResult::BranchAtSyncEnd => { - // Check the branch for any ports that were not used and - // insert them in the port mapping as not having fired. - for port_id in branch.owned_ports.iter().copied() { - let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_id }); - if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) { - entry.insert(BranchPortDesc { - last_registered_index: None, - num_times_fired: 0 - }); - } - } - - // Mark the branch as being done - branch.branch_state = BranchState::ReachedEndSync; - desc.spec_branches_done.push(branch_index); - }, - RunResult::BranchPut(port_id, value_group) => { - debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - debug_assert_eq!(value_group.values.len(), 1); // can only send one value - - // Branch just performed a `put()`. Check if we have - // assigned the port value and if so, if it is - // consistent. - let mut can_put = true; - branch.branch_context.just_called_did_put = true; - match branch.port_mapping.entry(port_id) { - Entry::Vacant(entry) => { - // No entry yet - entry.insert(BranchPortDesc{ - last_registered_index: Some(branch.index), - num_times_fired: 1, - }); - }, - Entry::Occupied(mut entry) => { - // Pre-existing entry - let entry = entry.get_mut(); - if entry.num_times_fired == 0 { - // This is 'fine' in the sense that we have - // a normal inconsistency in the branch. - branch.branch_state = BranchState::Failed; - can_put = false; - } else if entry.last_registered_index.is_none() { - // A put() that follows a fires() - entry.last_registered_index = Some(branch.index); - } else { - // This should be fine in the future. But - // for now we throw an error as it doesn't - // mesh well with the 'fires()' concept. - todo!("throw an error of some sort, then fail all related") - } - } - } - - if can_put { - // Actually put the message in the outbox - let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap(); - debug_assert_eq!(port_desc.owning_connector_id.unwrap(), connector_id); - let peer_id = port_desc.peer_id; - let peer_desc = self.ports.get(&peer_id).unwrap(); - debug_assert!(peer_desc.owning_connector_id.is_some()); - - let peer_id = PortId(Id{ - connector_id: peer_desc.owning_connector_id.unwrap(), - u32_suffix: peer_id - }); - - // For now this is the one and only time we're going - // to send a message. So for now we can't send a - // branch ID. - desc.global_outbox.insert_message(BufferedMessage{ - sending_port: port_id, - receiving_port: peer_id, - peer_prev_branch_id: None, - peer_cur_branch_id: branch_index, - message: value_group, - }); - - // Finally, because we were able to put the message, - // we can run the branch again - desc.spec_branches_active.push_back(branch_index); - return Scheduling::Immediate; - } - }, - _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), - } - - // Did not return that we need to immediately schedule again, so - // determine if we want to do so based on the current number of active - // speculative branches - if desc.spec_branches_active.is_empty() { - return Scheduling::NotNow; - } else { - return Scheduling::Later; - } - } - - #[inline] - fn run_connector_regular_mode(&mut self, connector_id: u32) -> Scheduling { - // Retrieve the connector and the branch (which is always the first one, - // since we assume we're not running in sync-mode). - let desc = self.connectors.get_mut(&connector_id).unwrap(); - debug_assert!(!desc.in_sync); - debug_assert!(desc.spec_branches_active.is_empty()); - debug_assert_eq!(desc.branches.len(), 1); - - let branch = &mut desc.branches[0]; - - // Run this branch to its blocking point - let mut run_context = Context{ - inbox: &branch.message_inbox, - port_mapping: &branch.port_mapping, - branch_ctx: &mut branch.branch_context, - }; - let run_result = branch.code_state.run(&mut run_context, &self.protocol); - - match run_result { - RunResult::ComponentTerminated => { - branch.branch_state = BranchState::Finished; - return Scheduling::NotNow - }, - RunResult::ComponentAtSyncStart => { - // Prepare for sync execution - Self::prepare_branch_for_sync(desc); - return Scheduling::Immediate; - }, - RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { - // Find all references to ports in the provided arguments, the - // ownership of these ports will be transferred to the connector - // we're about to create. - let mut ports = Vec::with_capacity(arguments.values.len()); - find_ports_in_value_group(&arguments, &mut ports); - - // Generate a new connector with its own state - let new_component_id = self.generate_connector_id(); - let new_component_state = ComponentState { - prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments) - }; - - for port_id in &ports { - let port = self.ports.get_mut(&port_id.0.u32_suffix).unwrap(); - debug_assert_eq!(port.owning_connector_id.unwrap(), connector_id); - port.owning_connector_id = Some(new_component_id) - } - - // Finally push the new connector into the registry - let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); - self.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports)); - self.connectors_active.push_back(new_component_id); - - return Scheduling::Immediate; - }, - RunResult::NewChannel => { - // Prepare channel - debug_assert!(run_context.branch_ctx.pending_channel.is_none()); - let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, Some(connector_id)); - run_context.branch_ctx.pending_channel = Some(( - port_value_from_id(Some(connector_id), put_id, true), - port_value_from_id(Some(connector_id), get_id, false) - )); - - return Scheduling::Immediate; - }, - _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result), - } - } - - /// Puts all the messages that are currently in the outbox of a particular - /// connector into the inbox of the receivers. If possible then branches - /// will be created that receive those messages. - fn empty_connector_outbox(&mut self, connector_index: u32) { - loop { - let connector = self.connectors.get_mut(&connector_index).unwrap(); - let message_to_send = connector.global_outbox.take_next_message_to_send(); - - if message_to_send.is_none() { - return; - } - - // We have a message to send - let message_to_send = message_to_send.unwrap(); - - // Lookup the target connector - let target_port = message_to_send.receiving_port; - let port_desc = self.ports.get(&target_port.0.u32_suffix).unwrap(); - debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id); - let target_connector_id = port_desc.owning_connector_id.unwrap(); - let target_connector = self.connectors.get_mut(&target_connector_id).unwrap(); - - // In any case, always put the message in the global inbox - target_connector.global_inbox.insert_message(message_to_send.clone()); - - // Check if there are any branches that are waiting on - // receives - if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) { - // Check each of the branches for a port mapping that - // matches the one on the message header - for branch_index in branch_indices { - let branch = &mut target_connector.branches[*branch_index as usize]; - debug_assert_eq!(branch.branch_state, BranchState::BranchPoint); - - let mut can_branch = false; - - if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) { - if port_desc.last_registered_index == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 { - can_branch = true; - } - } - - if can_branch { - // Put the message inside a clone of the currently - // waiting branch - let mut new_branch = Self::duplicate_branch(target_connector, *branch_index); - let new_branch_idx = new_branch.index; - let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap(); - new_port_desc.last_registered_index = Some(message_to_send.peer_cur_branch_id); - new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone()); - - // And queue the branch for further execution - target_connector.branches.push(new_branch); - target_connector.spec_branches_active.push_back(new_branch_idx); - if !self.connectors_active.contains(&target_connector.id) { - self.connectors_active.push_back(target_connector.id); - } - } - } - } - } - } - - /// Checks a connector for the submitted solutions. After all neighbouring - /// connectors have been checked all of their "last checked solution" index - /// will be incremented. - fn check_connector_new_solutions(&mut self, connector_id: u32) { - // Take connector and start processing its solutions - loop { - let connector = self.connectors.get_mut(&connector_id).unwrap(); - if connector.last_checked_done == connector.spec_branches_done.len() as u32 { - // Nothing to do - return; - } - - // We have a new solution - let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize]; - connector.last_checked_done += 1; - - // Check the connector+branch combination to see if a global - // solution has already been found - if let Some(global_solution) = self.check_connector_solution(connector_id, start_branch_index) { - // Found a global solution, apply it to all the connectors that - // participate - for (connector_id, local_solution) in global_solution.connector_mapping { - self.commit_connector_solution(connector_id, local_solution.final_branch_id); - } - } - } - } - - fn check_connector_solution(&mut self, first_connector_index: u32, first_branch_index: u32) -> Option { - // Take the connector and branch of interest - let first_connector = self.connectors.get(&first_connector_index).unwrap(); - let first_branch = &first_connector.branches[first_branch_index as usize]; - debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync); - - // Setup the first solution - let mut first_solution = ProposedSolution{ - connector_mapping: HashMap::new(), - connector_constraints: HashMap::new(), - remaining_connectors: Vec::new(), - }; - let mut first_local_solution = ProposedConnectorSolution{ - final_branch_id: first_branch.index, - all_branch_ids: Vec::new(), - port_mapping: first_branch.port_mapping - .iter() - .map(|(port_id, port_info)| { - (port_id.0.u32_suffix, port_info.last_registered_index) - }) - .collect(), - }; - self.determine_branch_ids(first_connector, first_branch.index, &mut first_local_solution.all_branch_ids); - first_solution.connector_mapping.insert(first_connector.id, first_local_solution); - - for (port_id, port_mapping) in first_branch.port_mapping.iter() { - let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap(); - let peer_port_id = port_desc.peer_id; - let peer_port_desc = self.ports.get(&peer_port_id).unwrap(); - let peer_connector_id = peer_port_desc.owning_connector_id.unwrap(); - - let constraint = match port_mapping.last_registered_index { - Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id), - None => ProposedBranchConstraint::SilentPort(peer_port_id), - }; - - match first_solution.connector_constraints.entry(peer_connector_id) { - Entry::Vacant(entry) => { - // Not yet encountered - entry.insert(vec![constraint]); - first_solution.remaining_connectors.push(peer_connector_id); - }, - Entry::Occupied(mut entry) => { - // Already encountered - let entry = entry.get_mut(); - if !entry.contains(&constraint) { - entry.push(constraint); - } - } - } - } - - // Setup storage for all possible solutions - let mut all_solutions = Vec::new(); - all_solutions.push(first_solution); - - while !all_solutions.is_empty() { - let mut cur_solution = all_solutions.pop().unwrap(); - - if cur_solution.remaining_connectors.is_empty() { - // All connectors have been visited, so commit the solution - debug_assert!(cur_solution.connector_constraints.is_empty()); - return Some(cur_solution); - } else { - // Not all connectors have been visited yet, so take one of the - // connectors and visit it. - let target_connector = cur_solution.remaining_connectors.pop().unwrap(); - self.merge_solution_with_connector(&mut cur_solution, &mut all_solutions, target_connector); - } - } - - // No satisfying solution found - return None; - } - - fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec, target_connector: u32) { - debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited - debug_assert!(cur_solution.connector_constraints.contains_key(&target_connector)); // but we encountered a reference to it - - let branch_constraints = cur_solution.connector_constraints.get(&target_connector).unwrap(); - let cur_connector = self.connectors.get(&target_connector).unwrap(); - - // Make sure all propositions are unique - for i in 0..branch_constraints.len() { - let proposition_i = branch_constraints[i]; - for j in 0..i { - let proposition_j = branch_constraints[j]; - debug_assert_ne!(proposition_i, proposition_j); - } - } - - // Go through the current connector's branches that have finished - 'branch_loop: for finished_branch_idx in cur_connector.spec_branches_done.iter().copied() { - let finished_branch = &cur_connector.branches[finished_branch_idx as usize]; - - // Construct a list of all the parent branch numbers - let mut parent_branch_ids = Vec::new(); - self.determine_branch_ids(cur_connector, finished_branch_idx, &mut parent_branch_ids); - - // Go through all constraints and make sure they are satisfied by - // the current branch - let mut all_constraints_satisfied = true; - - for constraint in branch_constraints { - match constraint { - ProposedBranchConstraint::SilentPort(port_id) => { - // Specified should have remained silent - let port_id = PortId(Id{ - connector_id: target_connector, - u32_suffix: *port_id, - }); - debug_assert!(finished_branch.port_mapping.contains_key(&port_id)); - let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap(); - all_constraints_satisfied = all_constraints_satisfied && mapped_port.num_times_fired == 0; - }, - ProposedBranchConstraint::BranchNumber(branch_id) => { - // Branch number should have appeared in the - // predecessor branches. - all_constraints_satisfied = all_constraints_satisfied && parent_branch_ids.contains(branch_id); - }, - ProposedBranchConstraint::PortMapping(port_id, branch_id) => { - // Port should map to a particular branch number - let port_id = PortId(Id{ - connector_id: target_connector, - u32_suffix: *port_id, - }); - debug_assert!(finished_branch.port_mapping.contains_key(&port_id)); - let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap(); - all_constraints_satisfied = all_constraints_satisfied && mapped_port.last_registered_index == Some(*branch_id); - } - } - - if !all_constraints_satisfied { - break; - } - } - - if !all_constraints_satisfied { - continue; - } - - // If here, then all constraints on the finished branch are - // satisfied. But the finished branch also puts constraints on the - // other connectors. So either: - // 1. Add them to the list of constraints a peer connector should - // adhere to. - // 2. Make sure that the provided connector solution matches the - // constraints imposed by the currently considered finished branch - // - // To make our lives a bit easier we already insert our proposed - // local solution into a prepared global solution. This makes - // looking up remote ports easier (since the channel might have its - // two ends owned by the same connector). - let mut new_solution = cur_solution.clone(); - debug_assert!(!new_solution.remaining_connectors.contains(&target_connector)); - new_solution.connector_constraints.remove(&target_connector); - new_solution.connector_mapping.insert(target_connector, ProposedConnectorSolution{ - final_branch_id: finished_branch.index, - all_branch_ids: parent_branch_ids, - port_mapping: finished_branch.port_mapping - .iter() - .map(|(port_id, port_desc)| { - (port_id.0.u32_suffix, port_desc.last_registered_index) - }) - .collect(), - }); - - for (local_port_id, port_desc) in &finished_branch.port_mapping { - // Retrieve port of peer - let port_info = self.ports.get(&local_port_id.0.u32_suffix).unwrap(); - let peer_port_id = port_info.peer_id; - let peer_port_info = self.ports.get(&peer_port_id).unwrap(); - let peer_connector_id = peer_port_info.owning_connector_id.unwrap(); - - // If the connector was not present in the new global solution - // yet, add it now, as it simplifies the following logic - if !new_solution.connector_mapping.contains_key(&peer_connector_id) && !new_solution.remaining_connectors.contains(&peer_connector_id) { - new_solution.connector_constraints.insert(peer_connector_id, Vec::new()); - new_solution.remaining_connectors.push(peer_connector_id); - } - - if new_solution.remaining_connectors.contains(&peer_connector_id) { - // Constraint applies to a connector that has not yet been - // visited - debug_assert!(new_solution.connector_constraints.contains_key(&peer_connector_id)); - debug_assert_ne!(peer_connector_id, target_connector); - - let new_constraint = if port_desc.num_times_fired == 0 { - ProposedBranchConstraint::SilentPort(peer_port_id) - } else if peer_port_info.is_getter { - // Peer port is a getter, so we want its port to map to - // the branch number in our port mapping. - debug_assert!(port_desc.last_registered_index.is_some()); - ProposedBranchConstraint::PortMapping(peer_port_id, port_desc.last_registered_index.unwrap()) - } else { - // Peer port is a putter, so we want to restrict the - // solution's run to contain the branch ID we received. - ProposedBranchConstraint::BranchNumber(port_desc.last_registered_index.unwrap()) - }; - - let peer_constraints = new_solution.connector_constraints.get_mut(&peer_connector_id).unwrap(); - if !peer_constraints.contains(&new_constraint) { - peer_constraints.push(new_constraint); - } - } else { - // Constraint applies to an already visited connector - let peer_solution = new_solution.connector_mapping.get(&peer_connector_id).unwrap(); - if port_desc.num_times_fired == 0 { - let peer_mapped_id = peer_solution.port_mapping.get(&peer_port_id).unwrap(); - if peer_mapped_id.is_some() { - all_constraints_satisfied = false; - break; - } - } else if peer_port_info.is_getter { - // Peer is getter, so its port should be mapped to one - // of our branch IDs. To simplify lookup we look at the - // last message we sent to the getter. - debug_assert!(port_desc.last_registered_index.is_some()); - let peer_port = peer_solution.port_mapping.get(&peer_port_id) - .map_or(None, |v| *v); - - if port_desc.last_registered_index != peer_port { - // No match - all_constraints_satisfied = false; - break; - } - } else { - // Peer is putter, so we expect to find our port mapping - // to match one of the branch numbers in the peer - // connector's local solution - debug_assert!(port_desc.last_registered_index.is_some()); - let expected_branch_id = port_desc.last_registered_index.unwrap(); - - if !peer_solution.all_branch_ids.contains(&expected_branch_id) { - all_constraints_satisfied = false; - break; - } - } - } - } - - if !all_constraints_satisfied { - // Final checks failed - continue 'branch_loop - } - - // We're sure that this branch matches the provided solution, so - // push it onto the list of considered solutions - all_solutions.push(new_solution); - } - } - - fn commit_connector_solution(&mut self, connector_id: u32, branch_id: u32) { - // Retrieve connector and branch - let connector = self.connectors.get_mut(&connector_id).unwrap(); - debug_assert_ne!(branch_id, 0); // because at 0 we have our initial backed-up non-sync branch - debug_assert!(connector.in_sync); - debug_assert!(connector.spec_branches_done.contains(&branch_id)); - - // Put the selected solution in front, the branch at index 0 is the - // "non-sync" branch. - connector.branches.swap(0, branch_id as usize); - connector.branches.truncate(1); - - // And reset the connector's state for further execution - connector.in_sync = false; - connector.spec_branches_active.clear(); - connector.spec_branches_pending_receive.clear(); - connector.spec_branches_done.clear(); - connector.last_checked_done = 0; - connector.global_inbox.clear(); - connector.global_outbox.clear(); - - // Do the same thing for the final selected branch - let final_branch = &mut connector.branches[0]; - final_branch.index = 0; - final_branch.parent_index = None; - debug_assert_eq!(final_branch.branch_state, BranchState::ReachedEndSync); - final_branch.branch_state = BranchState::RunningNonSync; - final_branch.message_inbox.clear(); - final_branch.port_mapping.clear(); - - // Might be that the connector was no longer running, if so, put it back - // in the list of connectors to run - if !self.connectors_active.contains(&connector_id) { - self.connectors_active.push_back(connector_id); - } - } - - fn generate_connector_id(&mut self) -> u32 { - let id = self.connector_counter; - self.connector_counter += 1; - return id; - } - - // ------------------------------------------------------------------------- - // Helpers for port management - // ------------------------------------------------------------------------- - - #[inline] - fn add_owned_channel(ports: &mut HashMap, port_counter: &mut u32, owning_connector_id: Option) -> (u32, u32) { - let get_id = *port_counter; - let put_id = *port_counter + 1; - (*port_counter) += 2; - - ports.insert(get_id, PortDesc{ - id: get_id, - peer_id: put_id, - owning_connector_id, - is_getter: true, - }); - ports.insert(put_id, PortDesc{ - id: put_id, - peer_id: get_id, - owning_connector_id, - is_getter: false, - }); - - return (put_id, get_id); - } - - // ------------------------------------------------------------------------- - // Helpers for branch management - // ------------------------------------------------------------------------- - - /// Prepares a speculative branch for further execution from the connector's - /// non-speculative base branch. - fn prepare_branch_for_sync(desc: &mut ConnectorDesc) { - // Ensure only one branch is active, the non-sync branch - debug_assert!(!desc.in_sync); - debug_assert_eq!(desc.branches.len(), 1); - debug_assert!(desc.spec_branches_active.is_empty()); - let new_branch_index = 1; - - // Push first speculative branch as active branch - let new_branch = BranchDesc::new_sync_from(new_branch_index, &desc.branches[0]); - desc.branches.push(new_branch); - desc.spec_branches_active.push_back(new_branch_index); - desc.in_sync = true; - } - - /// Duplicates a particular (speculative) branch and returns it. Due to - /// borrowing rules in code that uses this helper the returned branch still - /// needs to be pushed onto the member `branches`. - fn duplicate_branch(desc: &ConnectorDesc, original_branch_idx: u32) -> BranchDesc { - let original_branch = &desc.branches[original_branch_idx as usize]; - debug_assert!(desc.in_sync); - - let copied_index = desc.branches.len() as u32; - let copied_branch = BranchDesc::new_sync_from(copied_index, original_branch); - - return copied_branch; - } - - /// Retrieves all parent IDs of a particular branch. These numbers run from - /// the leaf towards the parent. - fn determine_branch_ids(&self, desc: &ConnectorDesc, first_branch_index: u32, result: &mut Vec) { - let mut next_branch_index = first_branch_index; - result.clear(); - - loop { - result.push(next_branch_index); - let branch = &desc.branches[next_branch_index as usize]; - - match branch.parent_index { - Some(index) => next_branch_index = index, - None => return, - } - } - } -} - -/// Context accessible by the code while being executed by the runtime. When the -/// code is being executed by the runtime it sometimes needs to interact with -/// the runtime. This is achieved by the "code throwing an error code", after -/// which the runtime modifies the appropriate variables and continues executing -/// the code again. -struct Context<'a> { - // Temporary references to branch related storage - inbox: &'a HashMap<(PortId, u32), ValueGroup>, - port_mapping: &'a HashMap, - branch_ctx: &'a mut BranchContext, -} - -impl<'a> crate::protocol::RunContext for Context<'a> { - fn did_put(&mut self, _port: PortId) -> bool { - // Note that we want "did put" to return false if we have fired zero - // times, because this implies we did a prevous - let old_value = self.branch_ctx.just_called_did_put; - self.branch_ctx.just_called_did_put = false; - return old_value; - } - - fn get(&mut self, port: PortId) -> Option { - let inbox_key = (port, 1); - match self.inbox.get(&inbox_key) { - None => None, - Some(value) => Some(value.clone()), - } - } - - fn fires(&mut self, port: PortId) -> Option { - match self.port_mapping.get(&port) { - None => None, - Some(port_info) => Some(Value::Bool(port_info.num_times_fired != 0)), - } - } - - fn get_channel(&mut self) -> Option<(Value, Value)> { - self.branch_ctx.pending_channel.take() - } -} - -/// Recursively goes through the value group, attempting to find ports. -/// Duplicates will only be added once. -fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { - // Helper to check a value for a port and recurse if needed. - fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { - match value { - Value::Input(port_id) | Value::Output(port_id) => { - // This is an actual port - for prev_port in ports.iter() { - if prev_port == port_id { - // Already added - return; - } - } - - ports.push(*port_id); - }, - Value::Array(heap_pos) | - Value::Message(heap_pos) | - Value::String(heap_pos) | - Value::Struct(heap_pos) | - Value::Union(_, heap_pos) => { - // Reference to some dynamic thing which might contain ports, - // so recurse - let heap_region = &group.regions[*heap_pos as usize]; - for embedded_value in heap_region { - find_port_in_value(group, embedded_value, ports); - } - }, - _ => {}, // values we don't care about - } - } - - // Clear the ports, then scan all the available values - ports.clear(); - for value in &value_group.values { - find_port_in_value(value_group, value, ports); - } -} - -fn port_value_from_id(connector_id: Option, port_id: u32, is_output: bool) -> Value { - let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system - if is_output { - return Value::Output(PortId(Id{ - connector_id, - u32_suffix: port_id - })); - } else { - return Value::Input(PortId(Id{ - connector_id, - u32_suffix: port_id, - })); - } -} \ No newline at end of file