use std::sync::Arc; use std::collections::{HashMap, VecDeque}; use std::collections::hash_map::{Entry}; use crate::{Polarity, PortId}; use crate::common::Id; use crate::protocol::*; use crate::protocol::eval::*; use super::messages::*; #[derive(Debug)] pub enum AddComponentError { ModuleDoesNotExist, ConnectorDoesNotExist, InvalidArgumentType(usize), // value is index of (first) invalid argument } pub(crate) struct PortDesc { id: u32, peer_id: u32, owning_connector_id: Option, is_getter: bool, // otherwise one can only call `put` } pub(crate) struct ConnectorDesc { id: u32, in_sync: bool, pub(crate) branches: Vec, // first one is always non-speculative one spec_branches_active: VecDeque, // branches that can be run immediately spec_branches_pending_receive: HashMap>, // from port_id to branch index spec_branches_done: Vec, last_checked_done: u32, global_inbox: ConnectorInbox, global_outbox: ConnectorOutbox, } impl ConnectorDesc { /// Creates a new connector description. Implicit assumption is that there /// is one (non-sync) branch that can be immediately executed. fn new(id: u32, component_state: ComponentState, owned_ports: Vec) -> Self { Self{ id, in_sync: false, branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)], spec_branches_active: VecDeque::new(), spec_branches_pending_receive: HashMap::new(), spec_branches_done: Vec::new(), last_checked_done: 0, global_inbox: ConnectorInbox::new(), global_outbox: ConnectorOutbox::new(), } } } #[derive(Debug, PartialEq, Eq)] pub(crate) enum BranchState { RunningNonSync, // regular running non-speculative branch RunningSync, // regular running speculative branch BranchPoint, // branch which ended up being a branching point ReachedEndSync, // branch that successfully reached the end-sync point, is a possible local solution Failed, // branch that became inconsistent Finished, // branch (necessarily non-sync) that reached end of code } #[derive(Debug, Clone)] struct BranchPortDesc { last_registered_index: Option, // if putter, then last sent branch ID, if getter, then last received branch ID num_times_fired: u32, // number of puts/gets on this port } struct BranchContext { just_called_did_put: bool, pending_channel: Option<(Value, Value)>, } pub(crate) struct BranchDesc { index: u32, parent_index: Option, pub(crate) code_state: ComponentState, pub(crate) branch_state: BranchState, owned_ports: Vec, message_inbox: HashMap<(PortId, u32), ValueGroup>, // from (port id, 1-based recv index) to received value port_mapping: HashMap, branch_context: BranchContext, } impl BranchDesc { /// Creates the first non-sync branch of a connector fn new_non_sync(component_state: ComponentState, owned_ports: Vec) -> Self { Self{ index: 0, parent_index: None, code_state: component_state, branch_state: BranchState::RunningNonSync, owned_ports, message_inbox: HashMap::new(), port_mapping: HashMap::new(), branch_context: BranchContext{ just_called_did_put: false, pending_channel: None, } } } /// Creates a sync branch based on the supplied branch. This supplied branch /// is the branching point for the new one, i.e. the parent in the branching /// tree. fn new_sync_from(index: u32, branch_state: &BranchDesc) -> Self { // We expect that the given branche's context is not halfway handling a // `put(...)` or a `channel x -> y` statement. debug_assert!(!branch_state.branch_context.just_called_did_put); debug_assert!(branch_state.branch_context.pending_channel.is_none()); Self{ index, parent_index: Some(branch_state.index), code_state: branch_state.code_state.clone(), branch_state: BranchState::RunningSync, owned_ports: branch_state.owned_ports.clone(), message_inbox: branch_state.message_inbox.clone(), port_mapping: branch_state.port_mapping.clone(), branch_context: BranchContext{ just_called_did_put: false, pending_channel: None, } } } } #[derive(PartialEq, Eq)] enum Scheduling { Immediate, Later, NotNow, } #[derive(Clone, Copy, Eq, PartialEq, Debug)] enum ProposedBranchConstraint { SilentPort(u32), // port id BranchNumber(u32), // branch id PortMapping(u32, u32), // particular port's mapped branch number } // Local solution of the connector #[derive(Clone)] struct ProposedConnectorSolution { final_branch_id: u32, all_branch_ids: Vec, // the final branch ID and, recursively, all parents port_mapping: HashMap>, // port IDs of the connector, mapped to their branch IDs (None for silent ports) } #[derive(Clone)] struct ProposedSolution { connector_mapping: HashMap, // from connector ID to branch ID connector_constraints: HashMap>, // from connector ID to encountered branch numbers remaining_connectors: Vec, // connectors that still need to be visited } // TODO: @performance, use freelists+ids instead of HashMaps pub struct Runtime { protocol: Arc, pub(crate) ports: HashMap, port_counter: u32, pub(crate) connectors: HashMap, connector_counter: u32, connectors_active: VecDeque, } impl Runtime { pub fn new(pd: Arc) -> Self { Self{ protocol: pd, ports: HashMap::new(), port_counter: 0, connectors: HashMap::new(), connector_counter: 0, connectors_active: VecDeque::new(), } } /// Creates a new channel that is not owned by any connector and returns its /// endpoints. The returned values are of the (putter port, getter port) /// respectively. pub fn add_channel(&mut self) -> (Value, Value) { let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, None); return ( port_value_from_id(None, put_id, true), port_value_from_id(None, get_id, false) ); } pub fn add_component(&mut self, module: &str, procedure: &str, values: ValueGroup) -> Result<(), AddComponentError> { use AddComponentError as ACE; use crate::runtime::error::AddComponentError as OldACE; // TODO: Remove the responsibility of adding a component from the PD // Lookup module and the component // TODO: Remove this error enum translation. Note that for now this // function forces port-only arguments let port_polarities = match self.protocol.component_polarities(module.as_bytes(), procedure.as_bytes()) { Ok(polarities) => polarities, Err(reason) => match reason { OldACE::NonPortTypeParameters => return Err(ACE::InvalidArgumentType(0)), OldACE::NoSuchModule => return Err(ACE::ModuleDoesNotExist), OldACE::NoSuchComponent => return Err(ACE::ModuleDoesNotExist), _ => unreachable!(), } }; // Make sure supplied values (and types) are correct. At the same time // modify the port IDs such that they contain the ID of the connector // we're about the create. let component_id = self.generate_connector_id(); let mut ports = Vec::with_capacity(values.values.len()); for (value_idx, value) in values.values.iter().enumerate() { let polarity = &port_polarities[value_idx]; match value { Value::Input(port_id) => { if *polarity != Polarity::Getter { return Err(ACE::InvalidArgumentType(value_idx)) } ports.push(PortId(Id{ connector_id: component_id, u32_suffix: port_id.0.u32_suffix, })); }, Value::Output(port_id) => { if *polarity != Polarity::Putter { return Err(ACE::InvalidArgumentType(value_idx)) } ports.push(PortId(Id{ connector_id: component_id, u32_suffix: port_id.0.u32_suffix })); }, _ => return Err(ACE::InvalidArgumentType(value_idx)) } } // Instantiate the component, and mark the ports as being owned by the // newly instantiated component let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports); let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); for port in &ports { let desc = self.ports.get_mut(port).unwrap(); desc.owning_connector_id = Some(component_id); } self.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); self.connectors_active.push_back(component_id); Ok(()) } pub fn run(&mut self) { // Go through all active connectors while !self.connectors_active.is_empty() { // Run a single connector until it indicates we can run another // connector let next_id = self.connectors_active.pop_front().unwrap(); let mut scheduling = Scheduling::Immediate; while scheduling == Scheduling::Immediate { scheduling = self.run_connector(next_id); } match scheduling { Scheduling::Immediate => unreachable!(), Scheduling::Later => self.connectors_active.push_back(next_id), Scheduling::NotNow => {}, } // Deal with any outgoing messages and potential solutions self.empty_connector_outbox(next_id); self.check_connector_new_solutions(next_id); } } /// Runs a connector for as long as sensible, then returns `true` if the /// connector should be run again in the future, and return `false` if the /// connector has terminated. Note that a terminated connector still /// requires cleanup. fn run_connector(&mut self, connector_id: u32) -> Scheduling { let desc = self.connectors.get_mut(&connector_id).unwrap(); if desc.in_sync { return self.run_connector_sync_mode(connector_id); } else { return self.run_connector_regular_mode(connector_id); } } #[inline] fn run_connector_sync_mode(&mut self, connector_id: u32) -> Scheduling { // Retrieve connector and branch that is supposed to be run let desc = self.connectors.get_mut(&connector_id).unwrap(); debug_assert!(desc.in_sync); debug_assert!(!desc.spec_branches_active.is_empty()); let branch_index = desc.spec_branches_active.pop_front().unwrap(); let branch = &mut desc.branches[branch_index as usize]; debug_assert_eq!(branch_index, branch.index); // Run this particular branch to a next blocking point let mut run_context = Context{ inbox: &branch.message_inbox, port_mapping: &branch.port_mapping, branch_ctx: &mut branch.branch_context, }; let run_result = branch.code_state.run(&mut run_context, &self.protocol); match run_result { RunResult::BranchInconsistent => { // Speculative branch became inconsistent. So we don't // run it again branch.branch_state = BranchState::Failed; }, RunResult::BranchMissingPortState(port_id) => { // Branch called `fires()` on a port that did not have a // value assigned yet. So branch and keep running. debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); debug_assert!(branch.port_mapping.get(&port_id).is_none()); let mut copied_branch = Self::duplicate_branch(desc, branch_index); let copied_index = copied_branch.index; copied_branch.port_mapping.insert(port_id, BranchPortDesc{ last_registered_index: None, num_times_fired: 1, }); let branch = &mut desc.branches[branch_index as usize]; // need to reborrow branch.port_mapping.insert(port_id, BranchPortDesc{ last_registered_index: None, num_times_fired: 0, }); // Run both again desc.branches.push(copied_branch); desc.spec_branches_active.push_back(branch_index); desc.spec_branches_active.push_back(copied_index); return Scheduling::Immediate; }, RunResult::BranchMissingPortValue(port_id) => { // Branch just performed a `get()` on a port that did // not yet receive a value. // First check if a port value is assigned to the // current branch. If so, check if it is consistent. debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); let mut insert_in_pending_receive = false; println!("DEBUG: Connector {} performing get on {:#?}", connector_id, port_id); match branch.port_mapping.entry(port_id) { Entry::Vacant(entry) => { // No entry yet, so force to firing entry.insert(BranchPortDesc{ last_registered_index: None, num_times_fired: 1, }); branch.branch_state = BranchState::BranchPoint; insert_in_pending_receive = true; }, Entry::Occupied(entry) => { // Have an entry, check if it is consistent let entry = entry.get(); if entry.num_times_fired == 0 { // Inconsistent branch.branch_state = BranchState::Failed; } else { // Perfectly fine, add to queue debug_assert!(entry.last_registered_index.is_none()); assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now"); branch.branch_state = BranchState::BranchPoint; insert_in_pending_receive = true; } } } println!("DEBUG: insert = {}, port mapping is now {:#?}", insert_in_pending_receive, &branch.port_mapping); if insert_in_pending_receive { // Perform the insert match desc.spec_branches_pending_receive.entry(port_id) { Entry::Vacant(entry) => { entry.insert(vec![branch_index]); } Entry::Occupied(mut entry) => { let entry = entry.get_mut(); debug_assert!(!entry.contains(&branch_index)); entry.push(branch_index); } } // But also check immediately if we don't have a // previously received message. If so, we // immediately branch and accept the message if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) { for message in messages { let mut new_branch = Self::duplicate_branch(desc, branch_index); let new_branch_idx = new_branch.index; let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap(); new_port_desc.last_registered_index = Some(message.peer_cur_branch_id); new_branch.message_inbox.insert((port_id, 1), message.message.clone()); desc.branches.push(new_branch); desc.spec_branches_active.push_back(new_branch_idx); } if !messages.is_empty() { return Scheduling::Immediate; } } } }, RunResult::BranchAtSyncEnd => { // Check the branch for any ports that were not used and // insert them in the port mapping as not having fired. for port_id in branch.owned_ports.iter().copied() { let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_id }); if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) { entry.insert(BranchPortDesc { last_registered_index: None, num_times_fired: 0 }); } } // Mark the branch as being done branch.branch_state = BranchState::ReachedEndSync; desc.spec_branches_done.push(branch_index); }, RunResult::BranchPut(port_id, value_group) => { debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); debug_assert_eq!(value_group.values.len(), 1); // can only send one value // Branch just performed a `put()`. Check if we have // assigned the port value and if so, if it is // consistent. println!("DEBUG: Connector {} performing put on {:#?}", connector_id, port_id); let mut can_put = true; branch.branch_context.just_called_did_put = true; match branch.port_mapping.entry(port_id) { Entry::Vacant(entry) => { // No entry yet entry.insert(BranchPortDesc{ last_registered_index: Some(branch.index), num_times_fired: 1, }); }, Entry::Occupied(mut entry) => { // Pre-existing entry let entry = entry.get_mut(); if entry.num_times_fired == 0 { // This is 'fine' in the sense that we have // a normal inconsistency in the branch. branch.branch_state = BranchState::Failed; can_put = false; } else if entry.last_registered_index.is_none() { // A put() that follows a fires() entry.last_registered_index = Some(branch.index); } else { // This should be fine in the future. But // for now we throw an error as it doesn't // mesh well with the 'fires()' concept. todo!("throw an error of some sort, then fail all related") } } } println!("DEBUG: can_put = {}, port mapping is now {:#?}", can_put, &branch.port_mapping); if can_put { // Actually put the message in the outbox let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap(); debug_assert_eq!(port_desc.owning_connector_id.unwrap(), connector_id); let peer_id = port_desc.peer_id; let peer_desc = self.ports.get(&peer_id).unwrap(); debug_assert!(peer_desc.owning_connector_id.is_some()); let peer_id = PortId(Id{ connector_id: peer_desc.owning_connector_id.unwrap(), u32_suffix: peer_id }); // For now this is the one and only time we're going // to send a message. So for now we can't send a // branch ID. desc.global_outbox.insert_message(BufferedMessage{ sending_port: port_id, receiving_port: peer_id, peer_prev_branch_id: None, peer_cur_branch_id: branch_index, message: value_group, }); // Finally, because we were able to put the message, // we can run the branch again desc.spec_branches_active.push_back(branch_index); return Scheduling::Immediate; } }, _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), } // Did not return that we need to immediately schedule again, so // determine if we want to do so based on the current number of active // speculative branches if desc.spec_branches_active.is_empty() { return Scheduling::NotNow; } else { return Scheduling::Later; } } #[inline] fn run_connector_regular_mode(&mut self, connector_id: u32) -> Scheduling { // Retrieve the connector and the branch (which is always the first one, // since we assume we're not running in sync-mode). let desc = self.connectors.get_mut(&connector_id).unwrap(); debug_assert!(!desc.in_sync); debug_assert!(desc.spec_branches_active.is_empty()); debug_assert_eq!(desc.branches.len(), 1); let branch = &mut desc.branches[0]; // Run this branch to its blocking point let mut run_context = Context{ inbox: &branch.message_inbox, port_mapping: &branch.port_mapping, branch_ctx: &mut branch.branch_context, }; let run_result = branch.code_state.run(&mut run_context, &self.protocol); match run_result { RunResult::ComponentTerminated => { branch.branch_state = BranchState::Finished; return Scheduling::NotNow }, RunResult::ComponentAtSyncStart => { // Prepare for sync execution Self::prepare_branch_for_sync(desc); return Scheduling::Immediate; }, RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Find all references to ports in the provided arguments, the // ownership of these ports will be transferred to the connector // we're about to create. let mut ports = Vec::with_capacity(arguments.values.len()); find_ports_in_value_group(&arguments, &mut ports); // Generate a new connector with its own state let new_component_id = self.generate_connector_id(); let new_component_state = ComponentState { prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments) }; for port_id in &ports { let port = self.ports.get_mut(&port_id.0.u32_suffix).unwrap(); debug_assert_eq!(port.owning_connector_id.unwrap(), connector_id); port.owning_connector_id = Some(new_component_id) } // Finally push the new connector into the registry let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); self.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports)); self.connectors_active.push_back(new_component_id); return Scheduling::Immediate; }, RunResult::NewChannel => { // Prepare channel debug_assert!(run_context.branch_ctx.pending_channel.is_none()); let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, Some(connector_id)); run_context.branch_ctx.pending_channel = Some(( port_value_from_id(Some(connector_id), put_id, true), port_value_from_id(Some(connector_id), get_id, false) )); return Scheduling::Immediate; }, _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result), } } /// Puts all the messages that are currently in the outbox of a particular /// connector into the inbox of the receivers. If possible then branches /// will be created that receive those messages. fn empty_connector_outbox(&mut self, connector_index: u32) { loop { let connector = self.connectors.get_mut(&connector_index).unwrap(); let message_to_send = connector.global_outbox.take_next_message_to_send(); if message_to_send.is_none() { return; } // We have a message to send let message_to_send = message_to_send.unwrap(); // Lookup the target connector let target_port = message_to_send.receiving_port; let port_desc = self.ports.get(&target_port.0.u32_suffix).unwrap(); debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id); let target_connector_id = port_desc.owning_connector_id.unwrap(); let target_connector = self.connectors.get_mut(&target_connector_id).unwrap(); // In any case, always put the message in the global inbox target_connector.global_inbox.insert_message(message_to_send.clone()); // Check if there are any branches that are waiting on // receives if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) { // Check each of the branches for a port mapping that // matches the one on the message header for branch_index in branch_indices { let branch = &mut target_connector.branches[*branch_index as usize]; debug_assert_eq!(branch.branch_state, BranchState::BranchPoint); let mut can_branch = false; if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) { if port_desc.last_registered_index == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 { can_branch = true; } } if can_branch { // Put the message inside a clone of the currently // waiting branch let mut new_branch = Self::duplicate_branch(target_connector, *branch_index); let new_branch_idx = new_branch.index; let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap(); new_port_desc.last_registered_index = Some(message_to_send.peer_cur_branch_id); new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone()); // And queue the branch for further execution target_connector.branches.push(new_branch); target_connector.spec_branches_active.push_back(new_branch_idx); if !self.connectors_active.contains(&target_connector.id) { self.connectors_active.push_back(target_connector.id); } } } } } } /// Checks a connector for the submitted solutions. After all neighbouring /// connectors have been checked all of their "last checked solution" index /// will be incremented. fn check_connector_new_solutions(&mut self, connector_id: u32) { // Take connector and start processing its solutions loop { let connector = self.connectors.get_mut(&connector_id).unwrap(); if connector.last_checked_done == connector.spec_branches_done.len() as u32 { // Nothing to do return; } // We have a new solution let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize]; connector.last_checked_done += 1; // Check the connector+branch combination to see if a global // solution has already been found if let Some(global_solution) = self.check_connector_solution(connector_id, start_branch_index) { // Found a global solution, apply it to all the connectors that // participate for (connector_id, local_solution) in global_solution.connector_mapping { self.commit_connector_solution(connector_id, local_solution.final_branch_id); } } } } fn check_connector_solution(&mut self, first_connector_index: u32, first_branch_index: u32) -> Option { // Take the connector and branch of interest let first_connector = self.connectors.get(&first_connector_index).unwrap(); let first_branch = &first_connector.branches[first_branch_index as usize]; debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync); // Setup the first solution let mut first_solution = ProposedSolution{ connector_mapping: HashMap::new(), connector_constraints: HashMap::new(), remaining_connectors: Vec::new(), }; let mut first_local_solution = ProposedConnectorSolution{ final_branch_id: first_branch.index, all_branch_ids: Vec::new(), port_mapping: first_branch.port_mapping .iter() .map(|(port_id, port_info)| { (port_id.0.u32_suffix, port_info.last_registered_index) }) .collect(), }; self.determine_branch_ids(first_connector, first_branch.index, &mut first_local_solution.all_branch_ids); first_solution.connector_mapping.insert(first_connector.id, first_local_solution); for (port_id, port_mapping) in first_branch.port_mapping.iter() { let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap(); let peer_port_id = port_desc.peer_id; let peer_port_desc = self.ports.get(&peer_port_id).unwrap(); let peer_connector_id = peer_port_desc.owning_connector_id.unwrap(); let constraint = match port_mapping.last_registered_index { Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id), None => ProposedBranchConstraint::SilentPort(peer_port_id), }; match first_solution.connector_constraints.entry(peer_connector_id) { Entry::Vacant(entry) => { // Not yet encountered entry.insert(vec![constraint]); first_solution.remaining_connectors.push(peer_connector_id); }, Entry::Occupied(mut entry) => { // Already encountered let entry = entry.get_mut(); if !entry.contains(&constraint) { entry.push(constraint); } } } } // Setup storage for all possible solutions let mut all_solutions = Vec::new(); all_solutions.push(first_solution); while !all_solutions.is_empty() { let mut cur_solution = all_solutions.pop().unwrap(); if cur_solution.remaining_connectors.is_empty() { // All connectors have been visited, so commit the solution debug_assert!(cur_solution.connector_constraints.is_empty()); return Some(cur_solution); } else { // Not all connectors have been visited yet, so take one of the // connectors and visit it. let target_connector = cur_solution.remaining_connectors.pop().unwrap(); self.merge_solution_with_connector(&mut cur_solution, &mut all_solutions, target_connector); } } // No satisfying solution found return None; } fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec, target_connector: u32) { debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited debug_assert!(cur_solution.connector_constraints.contains_key(&target_connector)); // but we encountered a reference to it let branch_constraints = cur_solution.connector_constraints.get(&target_connector).unwrap(); let cur_connector = self.connectors.get(&target_connector).unwrap(); // Make sure all propositions are unique for i in 0..branch_constraints.len() { let proposition_i = branch_constraints[i]; for j in 0..i { let proposition_j = branch_constraints[j]; debug_assert_ne!(proposition_i, proposition_j); } } // Go through the current connector's branches that have finished 'branch_loop: for finished_branch_idx in cur_connector.spec_branches_done.iter().copied() { let finished_branch = &cur_connector.branches[finished_branch_idx as usize]; // Construct a list of all the parent branch numbers let mut parent_branch_ids = Vec::new(); self.determine_branch_ids(cur_connector, finished_branch_idx, &mut parent_branch_ids); // Go through all constraints and make sure they are satisfied by // the current branch let mut all_constraints_satisfied = true; for constraint in branch_constraints { match constraint { ProposedBranchConstraint::SilentPort(port_id) => { // Specified should have remained silent let port_id = PortId(Id{ connector_id: target_connector, u32_suffix: *port_id, }); debug_assert!(finished_branch.port_mapping.contains_key(&port_id)); let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap(); all_constraints_satisfied = all_constraints_satisfied && mapped_port.num_times_fired == 0; }, ProposedBranchConstraint::BranchNumber(branch_id) => { // Branch number should have appeared in the // predecessor branches. all_constraints_satisfied = all_constraints_satisfied && parent_branch_ids.contains(branch_id); }, ProposedBranchConstraint::PortMapping(port_id, branch_id) => { // Port should map to a particular branch number let port_id = PortId(Id{ connector_id: target_connector, u32_suffix: *port_id, }); debug_assert!(finished_branch.port_mapping.contains_key(&port_id)); let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap(); all_constraints_satisfied = all_constraints_satisfied && mapped_port.last_registered_index == Some(*branch_id); } } if !all_constraints_satisfied { break; } } if !all_constraints_satisfied { continue; } // If here, then all constraints on the finished branch are // satisfied. But the finished branch also puts constraints on the // other connectors. So either: // 1. Add them to the list of constraints a peer connector should // adhere to. // 2. Make sure that the provided connector solution matches the // constraints imposed by the currently considered finished branch // // To make our lives a bit easier we already insert our proposed // local solution into a prepared global solution. This makes // looking up remote ports easier (since the channel might have its // two ends owned by the same connector). let mut new_solution = cur_solution.clone(); debug_assert!(!new_solution.remaining_connectors.contains(&target_connector)); new_solution.connector_constraints.remove(&target_connector); new_solution.connector_mapping.insert(target_connector, ProposedConnectorSolution{ final_branch_id: finished_branch.index, all_branch_ids: parent_branch_ids, port_mapping: finished_branch.port_mapping .iter() .map(|(port_id, port_desc)| { (port_id.0.u32_suffix, port_desc.last_registered_index) }) .collect(), }); for (local_port_id, port_desc) in &finished_branch.port_mapping { // Retrieve port of peer let port_info = self.ports.get(&local_port_id.0.u32_suffix).unwrap(); let peer_port_id = port_info.peer_id; let peer_port_info = self.ports.get(&peer_port_id).unwrap(); let peer_connector_id = peer_port_info.owning_connector_id.unwrap(); // If the connector was not present in the new global solution // yet, add it now, as it simplifies the following logic if !new_solution.connector_mapping.contains_key(&peer_connector_id) && !new_solution.remaining_connectors.contains(&peer_connector_id) { new_solution.connector_constraints.insert(peer_connector_id, Vec::new()); new_solution.remaining_connectors.push(peer_connector_id); } if new_solution.remaining_connectors.contains(&peer_connector_id) { // Constraint applies to a connector that has not yet been // visited debug_assert!(new_solution.connector_constraints.contains_key(&peer_connector_id)); debug_assert_ne!(peer_connector_id, target_connector); let new_constraint = if port_desc.num_times_fired == 0 { ProposedBranchConstraint::SilentPort(peer_port_id) } else if peer_port_info.is_getter { // Peer port is a getter, so we want its port to map to // the branch number in our port mapping. debug_assert!(port_desc.last_registered_index.is_some()); ProposedBranchConstraint::PortMapping(peer_port_id, port_desc.last_registered_index.unwrap()) } else { // Peer port is a putter, so we want to restrict the // solution's run to contain the branch ID we received. ProposedBranchConstraint::BranchNumber(port_desc.last_registered_index.unwrap()) }; let peer_constraints = new_solution.connector_constraints.get_mut(&peer_connector_id).unwrap(); if !peer_constraints.contains(&new_constraint) { peer_constraints.push(new_constraint); } } else { // Constraint applies to an already visited connector let peer_solution = new_solution.connector_mapping.get(&peer_connector_id).unwrap(); if port_desc.num_times_fired == 0 { let peer_mapped_id = peer_solution.port_mapping.get(&peer_port_id).unwrap(); if peer_mapped_id.is_some() { all_constraints_satisfied = false; break; } } else if peer_port_info.is_getter { // Peer is getter, so its port should be mapped to one // of our branch IDs. To simplify lookup we look at the // last message we sent to the getter. debug_assert!(port_desc.last_registered_index.is_some()); let peer_port = peer_solution.port_mapping.get(&peer_port_id) .map_or(None, |v| *v); if port_desc.last_registered_index != peer_port { // No match all_constraints_satisfied = false; break; } } else { // Peer is putter, so we expect to find our port mapping // to match one of the branch numbers in the peer // connector's local solution debug_assert!(port_desc.last_registered_index.is_some()); let expected_branch_id = port_desc.last_registered_index.unwrap(); if !peer_solution.all_branch_ids.contains(&expected_branch_id) { all_constraints_satisfied = false; break; } } } } if !all_constraints_satisfied { // Final checks failed continue 'branch_loop } // We're sure that this branch matches the provided solution, so // push it onto the list of considered solutions all_solutions.push(new_solution); } } fn commit_connector_solution(&mut self, connector_id: u32, branch_id: u32) { // Retrieve connector and branch let connector = self.connectors.get_mut(&connector_id).unwrap(); debug_assert_ne!(branch_id, 0); // because at 0 we have our initial backed-up non-sync branch debug_assert!(connector.in_sync); debug_assert!(connector.spec_branches_done.contains(&branch_id)); // Put the selected solution in front, the branch at index 0 is the // "non-sync" branch. connector.branches.swap(0, branch_id as usize); connector.branches.truncate(1); // And reset the connector's state for further execution connector.in_sync = false; connector.spec_branches_active.clear(); connector.spec_branches_pending_receive.clear(); connector.spec_branches_done.clear(); connector.last_checked_done = 0; connector.global_inbox.clear(); connector.global_outbox.clear(); // Do the same thing for the final selected branch let final_branch = &mut connector.branches[0]; final_branch.index = 0; final_branch.parent_index = None; debug_assert_eq!(final_branch.branch_state, BranchState::ReachedEndSync); final_branch.branch_state = BranchState::RunningNonSync; final_branch.message_inbox.clear(); final_branch.port_mapping.clear(); // Might be that the connector was no longer running, if so, put it back // in the list of connectors to run if !self.connectors_active.contains(&connector_id) { self.connectors_active.push_back(connector_id); } } fn generate_connector_id(&mut self) -> u32 { let id = self.connector_counter; self.connector_counter += 1; return id; } // ------------------------------------------------------------------------- // Helpers for port management // ------------------------------------------------------------------------- #[inline] fn add_owned_channel(ports: &mut HashMap, port_counter: &mut u32, owning_connector_id: Option) -> (u32, u32) { let get_id = *port_counter; let put_id = *port_counter + 1; (*port_counter) += 2; ports.insert(get_id, PortDesc{ id: get_id, peer_id: put_id, owning_connector_id, is_getter: true, }); ports.insert(put_id, PortDesc{ id: put_id, peer_id: get_id, owning_connector_id, is_getter: false, }); return (put_id, get_id); } // ------------------------------------------------------------------------- // Helpers for branch management // ------------------------------------------------------------------------- /// Prepares a speculative branch for further execution from the connector's /// non-speculative base branch. fn prepare_branch_for_sync(desc: &mut ConnectorDesc) { // Ensure only one branch is active, the non-sync branch debug_assert!(!desc.in_sync); debug_assert_eq!(desc.branches.len(), 1); debug_assert!(desc.spec_branches_active.is_empty()); let new_branch_index = 1; // Push first speculative branch as active branch let new_branch = BranchDesc::new_sync_from(new_branch_index, &desc.branches[0]); desc.branches.push(new_branch); desc.spec_branches_active.push_back(new_branch_index); desc.in_sync = true; } /// Duplicates a particular (speculative) branch and returns it. Due to /// borrowing rules in code that uses this helper the returned branch still /// needs to be pushed onto the member `branches`. fn duplicate_branch(desc: &ConnectorDesc, original_branch_idx: u32) -> BranchDesc { let original_branch = &desc.branches[original_branch_idx as usize]; debug_assert!(desc.in_sync); let copied_index = desc.branches.len() as u32; let copied_branch = BranchDesc::new_sync_from(copied_index, original_branch); return copied_branch; } /// Retrieves all parent IDs of a particular branch. These numbers run from /// the leaf towards the parent. fn determine_branch_ids(&self, desc: &ConnectorDesc, first_branch_index: u32, result: &mut Vec) { let mut next_branch_index = first_branch_index; result.clear(); loop { result.push(next_branch_index); let branch = &desc.branches[next_branch_index as usize]; match branch.parent_index { Some(index) => next_branch_index = index, None => return, } } } } /// Context accessible by the code while being executed by the runtime. When the /// code is being executed by the runtime it sometimes needs to interact with /// the runtime. This is achieved by the "code throwing an error code", after /// which the runtime modifies the appropriate variables and continues executing /// the code again. struct Context<'a> { // Temporary references to branch related storage inbox: &'a HashMap<(PortId, u32), ValueGroup>, port_mapping: &'a HashMap, branch_ctx: &'a mut BranchContext, } impl<'a> crate::protocol::RunContext for Context<'a> { fn did_put(&mut self, port: PortId) -> bool { // Note that we want "did put" to return false if we have fired zero // times, because this implies we did a prevous let old_value = self.branch_ctx.just_called_did_put; self.branch_ctx.just_called_did_put = false; return old_value; } fn get(&mut self, port: PortId) -> Option { let inbox_key = (port, 1); match self.inbox.get(&inbox_key) { None => None, Some(value) => Some(value.clone()), } } fn fires(&mut self, port: PortId) -> Option { match self.port_mapping.get(&port) { None => None, Some(port_info) => Some(Value::Bool(port_info.num_times_fired != 0)), } } fn get_channel(&mut self) -> Option<(Value, Value)> { self.branch_ctx.pending_channel.take() } } /// Recursively goes through the value group, attempting to find ports. /// Duplicates will only be added once. fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { // Helper to check a value for a port and recurse if needed. fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port for prev_port in ports.iter() { if prev_port == port_id { // Already added return; } } ports.push(*port_id); }, Value::Array(heap_pos) | Value::Message(heap_pos) | Value::String(heap_pos) | Value::Struct(heap_pos) | Value::Union(_, heap_pos) => { // Reference to some dynamic thing which might contain ports, // so recurse let heap_region = &group.regions[*heap_pos as usize]; for embedded_value in heap_region { find_port_in_value(group, embedded_value, ports); } }, _ => {}, // values we don't care about } } // Clear the ports, then scan all the available values ports.clear(); for value in &value_group.values { find_port_in_value(value_group, value, ports); } } fn port_value_from_id(connector_id: Option, port_id: u32, is_output: bool) -> Value { let connector_id = connector_id.unwrap_or(u32::MAX); // TODO: @hack, review entire PortId/ConnectorId/Id system if is_output { return Value::Output(PortId(Id{ connector_id, u32_suffix: port_id })); } else { return Value::Input(PortId(Id{ connector_id, u32_suffix: port_id, })); } }