diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 81b976c34f313ec9e6852da87a32b261fb042b02..9a43e687e1389cb34c3b6ebd684ce1b71f1b49ce 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::collections::hash_map::{Entry}; use crate::{Polarity, PortId}; @@ -7,7 +7,6 @@ use crate::common::Id; use crate::protocol::*; use crate::protocol::eval::*; -use super::registry::Registry; use super::messages::*; enum AddComponentError { @@ -27,7 +26,6 @@ struct ConnectorDesc { id: u32, in_sync: bool, branches: Vec, // first one is always non-speculative one - branch_id_counter: u32, spec_branches_active: VecDeque, // branches that can be run immediately spec_branches_pending_receive: HashMap>, // from port_id to branch index spec_branches_done: Vec, @@ -47,7 +45,6 @@ impl ConnectorDesc { id, in_sync: false, branches: vec![BranchDesc::new_non_sync(component_state, owned_ports)], - branch_id_counter: 1, spec_branches_active: branches_active, spec_branches_pending_receive: HashMap::new(), spec_branches_done: Vec::new(), @@ -58,6 +55,7 @@ impl ConnectorDesc { } } +#[derive(Debug, PartialEq, Eq)] enum BranchState { RunningNonSync, // regular running non-speculative branch RunningSync, // regular running speculative branch @@ -66,15 +64,15 @@ enum BranchState { Failed, // branch that became inconsistent } +#[derive(Clone)] struct BranchPortDesc { - last_registered_identifier: Option, // if putter, then last sent branch ID, if getter, then last received branch ID + last_registered_index: Option, // if putter, then last sent branch ID, if getter, then last received branch ID num_times_fired: u32, // number of puts/gets on this port } struct BranchDesc { index: u32, parent_index: Option, - identifier: u32, code_state: ComponentState, branch_state: BranchState, owned_ports: Vec, @@ -88,7 +86,6 @@ impl BranchDesc { Self{ index: 0, parent_index: None, - identifier: 0, code_state: component_state, branch_state: BranchState::RunningNonSync, owned_ports, @@ -100,11 +97,10 @@ impl BranchDesc { /// Creates a sync branch based on the supplied branch. This supplied branch /// is the branching point for the new one, i.e. the parent in the branching /// tree. - fn new_sync_from(index: u32, identifier: u32, branch_state: &BranchDesc) -> Self { + fn new_sync_from(index: u32, branch_state: &BranchDesc) -> Self { Self{ index, parent_index: Some(branch_state.index), - identifier, code_state: branch_state.code_state.clone(), branch_state: BranchState::RunningSync, owned_ports: branch_state.owned_ports.clone(), @@ -114,75 +110,42 @@ impl BranchDesc { } } -// Separate from Runtime for borrowing reasons -struct Registry { - ports: HashMap, - port_counter: u32, - connectors: HashMap, - connector_counter: u32, +#[derive(PartialEq, Eq)] +enum Scheduling { + Immediate, + Later, + NotNow, } -impl Registry { - fn new() -> Self { - Self{ - ports: HashMap::new(), - port_counter: 0, - connectors: HashMap::new(), - connector_counter: 0, - } - } - - /// Returns (putter_port, getter_port) - pub fn add_channel(&mut self, owning_connector_id: Option) -> (u32, u32) { - let get_id = self.generate_port_id(); - let put_id = self.generate_port_id(); - - self.ports.insert(get_id, PortDesc{ - id: get_id, - peer_id: put_id, - owning_connector_id, - is_getter: true, - }); - self.ports.insert(put_id, PortDesc{ - id: put_id, - peer_id: get_id, - owning_connector_id, - is_getter: false, - }); - - return (put_id, get_id); - } - - fn generate_port_id(&mut self) -> u32 { - let id = self.port_counter; - self.port_counter += 1; - return id; - } -} - -#[derive(Clone, Copy, Eq, PartialEq)] +#[derive(Clone, Copy, Eq, PartialEq, Debug)] enum ProposedBranchConstraint { SilentPort(u32), // port id BranchNumber(u32), // branch id + PortMapping(u32, u32), // particular port's mapped branch number } // Local solution of the connector +#[derive(Clone)] struct ProposedConnectorSolution { final_branch_id: u32, all_branch_ids: Vec, // the final branch ID and, recursively, all parents - silent_ports: Vec, // port IDs of the connector itself + port_mapping: HashMap>, // port IDs of the connector, mapped to their branch IDs (None for silent ports) } +#[derive(Clone)] struct ProposedSolution { connector_mapping: HashMap, // from connector ID to branch ID - connector_propositions: HashMap>, // from connector ID to encountered branch numbers + connector_constraints: HashMap>, // from connector ID to encountered branch numbers remaining_connectors: Vec, // connectors that still need to be visited } // TODO: @performance, use freelists+ids instead of HashMaps struct Runtime { protocol: Arc, - registry: Registry, + ports: HashMap, + port_counter: u32, + connectors: HashMap, + connector_counter: u32, connectors_active: VecDeque, } @@ -190,7 +153,10 @@ impl Runtime { pub fn new(pd: Arc) -> Self { Self{ protocol: pd, - registry: Registry::new(), + ports: HashMap::new(), + port_counter: 0, + connectors: HashMap::new(), + connector_counter: 0, connectors_active: VecDeque::new(), } } @@ -199,7 +165,7 @@ impl Runtime { /// endpoints. The returned values are of the (putter port, getter port) /// respectively. pub fn add_channel(&mut self) -> (Value, Value) { - let (put_id, get_id) = self.registry.add_channel(None); + let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, None); return ( port_value_from_id(None, put_id, true), port_value_from_id(None, get_id, false) @@ -255,7 +221,7 @@ impl Runtime { let component_state = self.protocol.new_component(module.as_bytes(), procedure.as_bytes(), &ports); let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); - self.registry.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); + self.connectors.insert(component_id, ConnectorDesc::new(component_id, component_state, ports)); self.connectors_active.push_back(component_id); Ok(()) @@ -264,16 +230,24 @@ impl Runtime { pub fn run(&mut self) { // Go through all active connectors while !self.connectors_active.is_empty() { - // Run a single connector + // Run a single connector until it indicates we can run another + // connector let next_id = self.connectors_active.pop_front().unwrap(); - let run_again = self.run_connector(next_id); + let mut scheduling = Scheduling::Immediate; + + while scheduling == Scheduling::Immediate { + scheduling = self.run_connector(next_id); + } - if run_again { - self.connectors_active.push_back(next_id); + match scheduling { + Scheduling::Immediate => unreachable!(), + Scheduling::Later => self.connectors_active.push_back(next_id), + Scheduling::NotNow => {}, } + // Deal with any outgoing messages and potential solutions self.empty_connector_outbox(next_id); - self.check_connector_solution(next_id); + self.check_connector_new_solutions(next_id); } } @@ -281,271 +255,327 @@ impl Runtime { /// connector should be run again in the future, and return `false` if the /// connector has terminated. Note that a terminated connector still /// requires cleanup. - pub fn run_connector(&mut self, id: u32) -> bool { - let desc = self.registry.connectors.get_mut(&id).unwrap(); - let mut run_context = Context{ - connector_id: id, - branch_id: None, - pending_channel: None, - }; + pub fn run_connector(&mut self, connector_id: u32) -> Scheduling { + let desc = self.connectors.get_mut(&connector_id).unwrap(); - let mut call_again = false; // TODO: Come back to this, silly pattern + if desc.in_sync { + return self.run_connector_sync_mode(connector_id); + } else { + return self.run_connector_regular_mode(connector_id); + } + } - while call_again { - call_again = false; // bit of a silly pattern, maybe revise + #[inline] + fn run_connector_sync_mode(&mut self, connector_id: u32) -> Scheduling { + // Retrieve connector and branch that is supposed to be run + let desc = self.connectors.get_mut(&connector_id).unwrap(); + debug_assert!(desc.in_sync); + debug_assert!(!desc.spec_branches_active.is_empty()); - if desc.in_sync { - // Running in synchronous mode, so run all branches until their - // blocking point - debug_assert!(!desc.spec_branches_active.is_empty()); - let branch_index = desc.spec_branches_active.pop_front().unwrap(); + let branch_index = desc.spec_branches_active.pop_front().unwrap(); + let branch = &mut desc.branches[branch_index as usize]; - let branch = &mut desc.branches[branch_index as usize]; - let run_result = branch.code_state.run(&mut run_context, &self.protocol); + // Run this particular branch to a next blocking point + // TODO: PERSISTENT RUN CTX + let mut run_context = Context{ + inbox: &branch.message_inbox, + port_mapping: &branch.port_mapping, + connector_id, + branch_id: Some(branch_index), + just_called_did_put: false, + pending_channel: None, + }; - match run_result { - RunResult::BranchInconsistent => { - // Speculative branch became inconsistent. So we don't - // run it again - branch.branch_state = BranchState::Failed; - }, - RunResult::BranchMissingPortState(port_id) => { - // Branch called `fires()` on a port that did not have a - // value assigned yet. So branch and keep running - debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - debug_assert!(branch.port_mapping.get(&port_id).is_none()); - - let copied_index = Self::duplicate_branch(desc, branch_index); - - // Need to re-borrow to assign changed port state - let original_branch = &mut desc.branches[branch_index as usize]; - original_branch.port_mapping.insert(port_id, BranchPortDesc{ - last_registered_identifier: None, - num_times_fired: 0, - }); + let run_result = branch.code_state.run(&mut run_context, &self.protocol); - let copied_branch = &mut desc.branches[copied_index as usize]; - copied_branch.port_mapping.insert(port_id, BranchPortDesc{ - last_registered_identifier: None, + match run_result { + RunResult::BranchInconsistent => { + // Speculative branch became inconsistent. So we don't + // run it again + branch.branch_state = BranchState::Failed; + }, + RunResult::BranchMissingPortState(port_id) => { + // Branch called `fires()` on a port that did not have a + // value assigned yet. So branch and keep running. + debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + debug_assert!(branch.port_mapping.get(&port_id).is_none()); + + let mut copied_branch = Self::duplicate_branch(desc, branch_index); + let copied_index = copied_branch.index; + + copied_branch.port_mapping.insert(port_id, BranchPortDesc{ + last_registered_index: None, + num_times_fired: 1, + }); + + let branch = &mut desc.branches[branch_index as usize]; // need to reborrow + branch.port_mapping.insert(port_id, BranchPortDesc{ + last_registered_index: None, + num_times_fired: 0, + }); + + // Run both again + desc.branches.push(copied_branch); + desc.spec_branches_active.push_back(branch_index); + desc.spec_branches_active.push_back(copied_index); + + return Scheduling::Immediate; + }, + RunResult::BranchMissingPortValue(port_id) => { + // Branch just performed a `get()` on a port that did + // not yet receive a value. + + // First check if a port value is assigned to the + // current branch. If so, check if it is consistent. + debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + let mut insert_in_pending_receive = false; + + match branch.port_mapping.entry(port_id) { + Entry::Vacant(entry) => { + // No entry yet, so force to firing + entry.insert(BranchPortDesc{ + last_registered_index: None, num_times_fired: 1, }); - - // Run both again - desc.spec_branches_active.push_back(branch_index); - desc.spec_branches_active.push_back(copied_index); + branch.branch_state = BranchState::BranchPoint; + insert_in_pending_receive = true; }, - RunResult::BranchMissingPortValue(port_id) => { - // Branch just performed a `get()` on a port that did - // not yet receive a value. - - // First check if a port value is assigned to the - // current branch. If so, check if it is consistent. - debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - let mut insert_in_pending_receive = false; - - match branch.port_mapping.entry(port_id) { - Entry::Vacant(entry) => { - // No entry yet, so force to firing - entry.insert(BranchPortDesc{ - last_registered_identifier: None, - num_times_fired: 1, - }); - branch.branch_state = BranchState::BranchPoint; - insert_in_pending_receive = true; - }, - Entry::Occupied(entry) => { - // Have an entry, check if it is consistent - let entry = entry.get(); - if entry.num_times_fired == 0 { - // Inconsistent - branch.branch_state = BranchState::Failed; - } else { - // Perfectly fine, add to queue - debug_assert!(entry.last_registered_identifier.is_none()); - assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now"); - branch.branch_state = BranchState::BranchPoint; - insert_in_pending_receive = true; - } - } + Entry::Occupied(entry) => { + // Have an entry, check if it is consistent + let entry = entry.get(); + if entry.num_times_fired == 0 { + // Inconsistent + branch.branch_state = BranchState::Failed; + } else { + // Perfectly fine, add to queue + debug_assert!(entry.last_registered_index.is_none()); + assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now"); + branch.branch_state = BranchState::BranchPoint; + insert_in_pending_receive = true; } + } + } - if insert_in_pending_receive { - // Perform the insert - match desc.spec_branches_pending_receive.entry(port_id) { - Entry::Vacant(entry) => { - entry.insert(vec![branch_index]); - } - Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - debug_assert!(!entry.contains(&branch_index)); - entry.push(branch_index); - } - } - - // But also check immediately if we don't have a - // previously received message. If so, we - // immediately branch and accept the message - if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) { - for message in messages { - let new_branch_idx = Self::duplicate_branch(desc, branch_index); - let new_branch = &mut desc.branches[new_branch_idx as usize]; - let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap(); - new_port_desc.last_registered_identifier = Some(message.peer_cur_branch_id); - new_branch.message_inbox.insert((port_id, 1), message.message.clone()); - - desc.spec_branches_active.push_back(new_branch_idx); - } - } + if insert_in_pending_receive { + // Perform the insert + match desc.spec_branches_pending_receive.entry(port_id) { + Entry::Vacant(entry) => { + entry.insert(vec![branch_index]); } - }, - RunResult::BranchAtSyncEnd => { - // Check the branch for any ports that were not used and - // insert them in the port mapping as not having fired. - for port_index in branch.owned_ports { - let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index }); - if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) { - entry.insert(BranchPortDesc { - last_registered_identifier: None, - num_times_fired: 0 - }); - } + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + debug_assert!(!entry.contains(&branch_index)); + entry.push(branch_index); } + } - // Mark the branch as being done - branch.branch_state = BranchState::ReachedEndSync; - desc.spec_branches_done.push(branch_index); - }, - RunResult::BranchPut(port_id, value_group) => { - debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - debug_assert_eq!(value_group.values.len(), 1); // can only send one value - - // Branch just performed a `put()`. Check if we have - // assigned the port value and if so, if it is - // consistent. - let mut can_put = true; - match branch.port_mapping.entry(port_id) { - Entry::Vacant(entry) => { - // No entry yet - entry.insert(BranchPortDesc{ - last_registered_identifier: Some(branch.identifier), - num_times_fired: 1, - }); - }, - Entry::Occupied(mut entry) => { - // Pre-existing entry - let entry = entry.get_mut(); - if entry.num_times_fired == 0 { - // This is 'fine' in the sense that we have - // a normal inconsistency in the branch. - branch.branch_state = BranchState::Failed; - can_put = false; - } else if entry.last_registered_identifier.is_none() { - // A put() that follows a fires() - entry.last_registered_identifier = Some(branch.identifier); - } else { - // This should be fine in the future. But - // for now we throw an error as it doesn't - // mesh well with the 'fires()' concept. - todo!("throw an error of some sort, then fail all related") - } - } + // But also check immediately if we don't have a + // previously received message. If so, we + // immediately branch and accept the message + if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) { + for message in messages { + let mut new_branch = Self::duplicate_branch(desc, branch_index); + let new_branch_idx = new_branch.index; + let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap(); + new_port_desc.last_registered_index = Some(message.peer_cur_branch_id); + new_branch.message_inbox.insert((port_id, 1), message.message.clone()); + + desc.branches.push(new_branch); + desc.spec_branches_active.push_back(new_branch_idx); } - if can_put { - // Actually put the message in the outbox - let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap(); - let peer_id = port_desc.peer_id; - let peer_desc = self.registry.ports.get(&peer_id).unwrap(); - debug_assert!(peer_desc.owning_connector_id.is_some()); - - let peer_id = PortId(Id{ - connector_id: peer_desc.owning_connector_id.unwrap(), - u32_suffix: peer_id - }); - - // For now this is the one and only time we're going - // to send a message. So for now we can't send a - // branch ID. - desc.global_outbox.insert((port_id, 1), BufferedMessage{ - sending_port: port_id, - receiving_port: peer_id, - peer_prev_branch_id: None, - peer_cur_branch_id: 0, - message: value_group, - }); - - // Finally, because we were able to put the message, - // we can run the branch again - desc.spec_branches_active.push_back(branch_index); - call_again = true; + if !messages.is_empty() { + return Scheduling::Immediate; } - }, - _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), + } } - } else { - // Running in non-synchronous mode - let branch = &mut desc.branches[0]; - let run_result = branch.code_state.run(&mut run_context, &self.protocol); - - match run_result { - RunResult::ComponentTerminated => return false, - RunResult::ComponentAtSyncStart => { - // Prepare for sync execution - Self::prepare_branch_for_sync(desc); - call_again = true; + }, + RunResult::BranchAtSyncEnd => { + // Check the branch for any ports that were not used and + // insert them in the port mapping as not having fired. + for port_index in branch.owned_ports.iter().copied() { + let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index }); + if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) { + entry.insert(BranchPortDesc { + last_registered_index: None, + num_times_fired: 0 + }); + } + } + + // Mark the branch as being done + branch.branch_state = BranchState::ReachedEndSync; + desc.spec_branches_done.push(branch_index); + }, + RunResult::BranchPut(port_id, value_group) => { + debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + debug_assert_eq!(value_group.values.len(), 1); // can only send one value + + // Branch just performed a `put()`. Check if we have + // assigned the port value and if so, if it is + // consistent. + let mut can_put = true; + match branch.port_mapping.entry(port_id) { + Entry::Vacant(entry) => { + // No entry yet + entry.insert(BranchPortDesc{ + last_registered_index: Some(branch.index), + num_times_fired: 1, + }); }, - RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { - // Generate a new connector with its own state - let new_component_id = self.generate_connector_id(); - let new_component_state = ComponentState { - prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments) - }; - - // Transfer the ownership of any ports to the new connector - let mut ports = Vec::with_capacity(arguments.values.len()); - find_ports_in_value_group(&arguments, &mut ports); - for port_id in &ports { - let port = self.registry.ports.get_mut(&port_id.0.u32_suffix).unwrap(); - debug_assert_eq!(port.owning_connector_id.unwrap(), run_context.connector_id); - port.owning_connector_id = Some(new_component_id) + Entry::Occupied(mut entry) => { + // Pre-existing entry + let entry = entry.get_mut(); + if entry.num_times_fired == 0 { + // This is 'fine' in the sense that we have + // a normal inconsistency in the branch. + branch.branch_state = BranchState::Failed; + can_put = false; + } else if entry.last_registered_index.is_none() { + // A put() that follows a fires() + entry.last_registered_index = Some(branch.index); + } else { + // This should be fine in the future. But + // for now we throw an error as it doesn't + // mesh well with the 'fires()' concept. + todo!("throw an error of some sort, then fail all related") } + } + } - // Finally push the new connector into the registry - let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); - self.registry.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports)); - self.connectors_active.push_back(new_component_id); - }, - RunResult::NewChannel => { - // Prepare channel - debug_assert!(run_context.pending_channel.is_none()); - let (put_id, get_id) = self.registry.add_channel(Some(run_context.connector_id)); - run_context.pending_channel = Some(( - port_value_from_id(Some(run_context.connector_id), put_id, true), - port_value_from_id(Some(run_context.connector_id), get_id, false) - )); - - // Call again so it is retrieved from the context - call_again = true; - }, - _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result), + if can_put { + // Actually put the message in the outbox + let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap(); + let peer_id = port_desc.peer_id; + let peer_desc = self.ports.get(&peer_id).unwrap(); + debug_assert!(peer_desc.owning_connector_id.is_some()); + + let peer_id = PortId(Id{ + connector_id: peer_desc.owning_connector_id.unwrap(), + u32_suffix: peer_id + }); + + // For now this is the one and only time we're going + // to send a message. So for now we can't send a + // branch ID. + desc.global_outbox.insert_message(BufferedMessage{ + sending_port: port_id, + receiving_port: peer_id, + peer_prev_branch_id: None, + peer_cur_branch_id: 0, + message: value_group, + }); + + // Finally, because we were able to put the message, + // we can run the branch again + desc.spec_branches_active.push_back(branch_index); + return Scheduling::Immediate; } - } + }, + _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), } - return true; + // Did not return that we need to immediately schedule again, so + // determine if we want to do so based on the current number of active + // speculative branches + if desc.spec_branches_active.is_empty() { + return Scheduling::NotNow; + } else { + return Scheduling::Later; + } + } + + #[inline] + fn run_connector_regular_mode(&mut self, connector_id: u32) -> Scheduling { + // Retrieve the connector and the branch (which is always the first one, + // since we assume we're not running in sync-mode). + // TODO: CONTINUE HERE, PERSEISTENT BRANCH CONTEXT + let desc = self.connectors.get_mut(&connector_id).unwrap(); + debug_assert!(!desc.in_sync); + debug_assert_eq!(desc.branches.len(), 1); + + let branch = &mut desc.branches[0]; + + // Run this branch to its blocking point + let mut run_context = Context{ + inbox: &branch.message_inbox, + port_mapping: &branch.port_mapping, + connector_id, + branch_id: None, + just_called_did_put: false, + pending_channel: None, + }; + let run_result = branch.code_state.run(&mut run_context, &self.protocol); + + match run_result { + RunResult::ComponentTerminated => return Scheduling::NotNow, + RunResult::ComponentAtSyncStart => { + // Prepare for sync execution + Self::prepare_branch_for_sync(desc); + return Scheduling::Immediate; + }, + RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { + // Find all references to ports in the provided arguments, the + // ownership of these ports will be transferred to the connector + // we're about to create. + let mut ports = Vec::with_capacity(arguments.values.len()); + find_ports_in_value_group(&arguments, &mut ports); + + // Generate a new connector with its own state + let new_component_id = self.generate_connector_id(); + let new_component_state = ComponentState { + prompt: Prompt::new(&self.protocol.types, &self.protocol.heap, definition_id, monomorph_idx, arguments) + }; + + for port_id in &ports { + let port = self.ports.get_mut(&port_id.0.u32_suffix).unwrap(); + debug_assert_eq!(port.owning_connector_id.unwrap(), connector_id); + port.owning_connector_id = Some(new_component_id) + } + + // Finally push the new connector into the registry + let ports = ports.into_iter().map(|v| v.0.u32_suffix).collect(); + self.connectors.insert(new_component_id, ConnectorDesc::new(new_component_id, new_component_state, ports)); + self.connectors_active.push_back(new_component_id); + + return Scheduling::Immediate; + }, + RunResult::NewChannel => { + // Prepare channel + debug_assert!(run_context.pending_channel.is_none()); + let (put_id, get_id) = Self::add_owned_channel(&mut self.ports, &mut self.port_counter, Some(connector_id)); + run_context.pending_channel = Some(( + port_value_from_id(Some(connector_id), put_id, true), + port_value_from_id(Some(connector_id), get_id, false) + )); + + return Scheduling::Immediate; + }, + _ => unreachable!("got result '{:?}' from running component in non-sync mode", run_result), + } } /// Puts all the messages that are currently in the outbox of a particular /// connector into the inbox of the receivers. If possible then branches /// will be created that receive those messages. fn empty_connector_outbox(&mut self, connector_index: u32) { - let connector = self.registry.connectors.get_mut(&connector_index).unwrap(); - while let Some(message_to_send) = connector.global_outbox.take_next_message_to_send() { + loop { + let connector = self.connectors.get_mut(&connector_index).unwrap(); + let message_to_send = connector.global_outbox.take_next_message_to_send(); + + if message_to_send.is_none() { + return; + } + + // We have a message to send + let message_to_send = message_to_send.unwrap(); + // Lookup the target connector - let port_desc = self.registry.ports.get(&target_port.0.u32_suffix).unwrap(); + let target_port = message_to_send.receiving_port; + let port_desc = self.ports.get(&target_port.0.u32_suffix).unwrap(); debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id); let target_connector_id = port_desc.owning_connector_id.unwrap(); - let target_connector = self.registry.connectors.get_mut(&target_connector_id).unwrap(); + let target_connector = self.connectors.get_mut(&target_connector_id).unwrap(); // In any case, always put the message in the global inbox target_connector.global_inbox.insert_message(message_to_send.clone()); @@ -562,7 +592,7 @@ impl Runtime { let mut can_branch = false; if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) { - if port_desc.last_registered_identifier == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 { + if port_desc.last_registered_index == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 { can_branch = true; } } @@ -570,14 +600,15 @@ impl Runtime { if can_branch { // Put the message inside a clone of the currently // waiting branch - let new_branch_idx = Self::duplicate_branch(target_connector, *branch_index); - let new_branch = &mut target_connector.branches[new_branch_idx as usize]; + let mut new_branch = Self::duplicate_branch(target_connector, *branch_index); + let new_branch_idx = new_branch.index; let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap(); - new_port_desc.last_registered_identifier = Some(message_to_send.peer_cur_branch_id); + new_port_desc.last_registered_index = Some(message_to_send.peer_cur_branch_id); new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone()); // And queue the branch for further execution - target_connector.spec_branches_active.push(new_branch_idx); + target_connector.branches.push(new_branch); + target_connector.spec_branches_active.push_back(new_branch_idx); if !self.connectors_active.contains(&target_connector.id) { self.connectors_active.push_back(target_connector.id); } @@ -590,55 +621,68 @@ impl Runtime { /// Checks a connector for the submitted solutions. After all neighbouring /// connectors have been checked all of their "last checked solution" index /// will be incremented. - fn check_connector_new_solutions(&mut self, connector_index: u32) { + fn check_connector_new_solutions(&mut self, connector_id: u32) { // Take connector and start processing its solutions - let connector = self.registry.connectors.get_mut(&connector_index).unwrap(); - let mut considered_connectors = HashSet::new(); - let mut valid_solutions = Vec::new(); + loop { + let connector = self.connectors.get_mut(&connector_id).unwrap(); + if connector.last_checked_done == connector.spec_branches_done.len() as u32 { + // Nothing to do + return; + } - while connector.last_checked_done != connector.spec_branches_done.len() as u32 { - // We have a new solution to consider + // We have a new solution let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize]; connector.last_checked_done += 1; - let branch = &connector.branches[start_branch_index as usize]; - debug_assert_eq!(branch.branch_state, BranchState::ReachedEndSync); - - // Clear storage for potential solutions - considered_connectors.clear(); - - // Start seeking solution among other connectors within the same - // synchronous region - considered_connectors.insert(connector.id); - for port in branch.port_ + // Check the connector+branch combination to see if a global + // solution has already been found + if let Some(global_solution) = self.check_connector_solution(connector_id, start_branch_index) { + // Found a global solution, apply it to all the connectors that + // participate + for (connector_id, local_solution) in global_solution.connector_mapping { + self.commit_connector_solution(connector_id, local_solution.final_branch_id); + } + } } } - fn check_connector_solution(&self, first_connector_index: u32, first_branch_index: u32) { + fn check_connector_solution(&mut self, first_connector_index: u32, first_branch_index: u32) -> Option { // Take the connector and branch of interest - let first_connector = self.registry.connectors.get(&first_connector_index).unwrap(); + let first_connector = self.connectors.get(&first_connector_index).unwrap(); let first_branch = &first_connector.branches[first_branch_index as usize]; debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync); // Setup the first solution let mut first_solution = ProposedSolution{ connector_mapping: HashMap::new(), - connector_propositions: HashMap::new(), + connector_constraints: HashMap::new(), remaining_connectors: Vec::new(), }; - first_solution.connector_mapping.insert(first_connector.id, first_branch.identifier); + let mut first_local_solution = ProposedConnectorSolution{ + final_branch_id: first_branch.index, + all_branch_ids: Vec::new(), + port_mapping: first_branch.port_mapping + .iter() + .map(|(port_id, port_info)| { + (port_id.0.u32_suffix, port_info.last_registered_index) + }) + .collect(), + }; + self.determine_branch_ids(first_connector, first_branch.index, &mut first_local_solution.all_branch_ids); + first_solution.connector_mapping.insert(first_connector.id, first_local_solution); + for (port_id, port_mapping) in first_branch.port_mapping.iter() { - let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap(); + let port_desc = self.ports.get(&port_id.0.u32_suffix).unwrap(); let peer_port_id = port_desc.peer_id; - let peer_port_desc = self.registry.ports.get(&peer_port_id).unwrap(); + let peer_port_desc = self.ports.get(&peer_port_id).unwrap(); let peer_connector_id = peer_port_desc.owning_connector_id.unwrap(); - let constraint = match port_mapping.last_registered_identifier { + let constraint = match port_mapping.last_registered_index { Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id), None => ProposedBranchConstraint::SilentPort(peer_port_id), }; - match first_solution.connector_propositions.entry(peer_connector_id) { + match first_solution.connector_constraints.entry(peer_connector_id) { Entry::Vacant(entry) => { // Not yet encountered entry.insert(vec![constraint]); @@ -661,93 +705,269 @@ impl Runtime { while !all_solutions.is_empty() { let mut cur_solution = all_solutions.pop().unwrap(); + if cur_solution.remaining_connectors.is_empty() { + // All connectors have been visited, so commit the solution + debug_assert!(cur_solution.connector_constraints.is_empty()); + return Some(cur_solution); + } else { + // Not all connectors have been visited yet, so take one of the + // connectors and visit it. + let target_connector = cur_solution.remaining_connectors.pop().unwrap(); + self.merge_solution_with_connector(&mut cur_solution, &mut all_solutions, target_connector); + } } + + // No satisfying solution found + return None; } fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec, target_connector: u32) { debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited - debug_assert!(cur_solution.connector_propositions.contains_key(&target_connector)); // but we encountered a reference to it + debug_assert!(cur_solution.connector_constraints.contains_key(&target_connector)); // but we encountered a reference to it - let branch_propositions = cur_solution.connector_propositions.get(&target_connector).unwrap(); - let cur_connector = self.registry.connectors.get(&target_connector).unwrap(); + let branch_constraints = cur_solution.connector_constraints.get(&target_connector).unwrap(); + let cur_connector = self.connectors.get(&target_connector).unwrap(); // Make sure all propositions are unique - for i in 0..branch_propositions.len() { - let proposition_i = branch_propositions[i]; + for i in 0..branch_constraints.len() { + let proposition_i = branch_constraints[i]; for j in 0..i { - let proposition_j = branch_propositions[j]; + let proposition_j = branch_constraints[j]; debug_assert_ne!(proposition_i, proposition_j); } } - // Check connector for compatible branches - let mut considered_branches = Vec::with_capacity(cur_connector.spec_branches_done.len()); - let mut encountered_propositions = Vec::new(); + // Go through the current connector's branches that have finished + 'branch_loop: for finished_branch_idx in cur_connector.spec_branches_done.iter().copied() { + let finished_branch = &cur_connector.branches[finished_branch_idx as usize]; - 'finished_branch_loop: for branch_idx in cur_connector.spec_branches_done { - // Reset the propositions matching variables - encountered_propositions.clear(); - encountered_propositions.resize(branch_propositions.len(), false); + // Construct a list of all the parent branch numbers + let mut parent_branch_ids = Vec::new(); + self.determine_branch_ids(cur_connector, finished_branch_idx, &mut parent_branch_ids); - // First check the silent port propositions - let cur_branch = &cur_connector.branches[branch_idx as usize]; - for (proposition_idx, proposition) in branch_propositions.iter().enumerate() { - match proposition { - ProposedBranchConstraint::SilentPort(port_id) => { - let old_school_port_id = PortId(Id{ connector_id: cur_connector.id, u32_suffix: *port_id }); - let port_mapping = cur_branch.port_mapping.get(&old_school_port_id).unwrap(); - if port_mapping.num_times_fired != 0 { - // Port did fire, so the current branch is not - // compatible - continue 'finished_branch_loop; - } + // Go through all constraints and make sure they are satisfied by + // the current branch + let mut all_constraints_satisfied = true; - // Otherwise, the port was silent indeed - encountered_propositions[proposition_idx] = true; + for constraint in branch_constraints { + match constraint { + ProposedBranchConstraint::SilentPort(port_id) => { + // Specified should have remained silent + let port_id = PortId(Id{ + connector_id: target_connector, + u32_suffix: *port_id, + }); + debug_assert!(finished_branch.port_mapping.contains_key(&port_id)); + let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap(); + all_constraints_satisfied = all_constraints_satisfied && mapped_port.num_times_fired == 0; }, - ProposedBranchConstraint::BranchNumber(_) => {}, - } - } - - // Then check the branch number propositions - let mut parent_branch_idx = branch_idx; - loop { - let branch = &cur_connector.branches[parent_branch_idx as usize]; - for proposition_idx in 0..branch_propositions.len() { - let proposition = branch_propositions[proposition_idx]; - match proposition { - ProposedBranchConstraint::SilentPort(_) => {}, - ProposedBranchConstraint::BranchNumber(branch_number) => { - if branch_number == branch.identifier { - encountered_propositions[proposition_idx] = true; - } - } + ProposedBranchConstraint::BranchNumber(branch_id) => { + // Branch number should have appeared in the + // predecessor branches. + all_constraints_satisfied = all_constraints_satisfied && parent_branch_ids.contains(branch_id); + }, + ProposedBranchConstraint::PortMapping(port_id, branch_id) => { + // Port should map to a particular branch number + let port_id = PortId(Id{ + connector_id: target_connector, + u32_suffix: *port_id, + }); + debug_assert!(finished_branch.port_mapping.contains_key(&port_id)); + let mapped_port = finished_branch.port_mapping.get(&port_id).unwrap(); + all_constraints_satisfied = all_constraints_satisfied && mapped_port.last_registered_index == Some(*branch_id); } } - if branch.parent_index.is_none() { - // No more parents + if !all_constraints_satisfied { break; } + } + + if !all_constraints_satisfied { + continue; + } + + // If here, then all constraints on the finished branch are + // satisfied. But the finished branch also puts constraints on the + // other connectors. So either: + // 1. Add them to the list of constraints a peer connector should + // adhere to. + // 2. Make sure that the provided connector solution matches the + // constraints imposed by the currently considered finished branch + // + // To make our lives a bit easier we already insert our proposed + // local solution into a prepared global solution. This makes + // looking up remote ports easier (since the channel might have its + // two ends owned by the same connector). + let mut new_solution = cur_solution.clone(); + debug_assert!(!new_solution.remaining_connectors.contains(&target_connector)); + new_solution.connector_constraints.remove(&target_connector); + new_solution.connector_mapping.insert(target_connector, ProposedConnectorSolution{ + final_branch_id: finished_branch.index, + all_branch_ids: parent_branch_ids, + port_mapping: finished_branch.port_mapping + .iter() + .map(|(port_id, port_desc)| { + (port_id.0.u32_suffix, port_desc.last_registered_index) + }) + .collect(), + }); + + for (local_port_id, port_desc) in &finished_branch.port_mapping { + // Retrieve port of peer + let port_info = self.ports.get(&local_port_id.0.u32_suffix).unwrap(); + let peer_port_id = port_info.peer_id; + let peer_port_info = self.ports.get(&peer_port_id).unwrap(); + let peer_connector_id = peer_port_info.owning_connector_id.unwrap(); + + // If the connector was not present in the new global solution + // yet, add it now, as it simplifies the following logic + if !new_solution.connector_mapping.contains_key(&peer_connector_id) && !new_solution.remaining_connectors.contains(&peer_connector_id) { + new_solution.connector_constraints.insert(peer_connector_id, Vec::new()); + new_solution.remaining_connectors.push(peer_connector_id); + } - parent_branch_idx = branch.parent_index.unwrap(); + if new_solution.remaining_connectors.contains(&peer_connector_id) { + // Constraint applies to a connector that has not yet been + // visited + debug_assert!(new_solution.connector_constraints.contains_key(&peer_connector_id)); + debug_assert_ne!(peer_connector_id, target_connector); + + let new_constraint = if port_desc.num_times_fired == 0 { + ProposedBranchConstraint::SilentPort(peer_port_id) + } else if peer_port_info.is_getter { + // Peer port is a getter, so we want its port to map to + // the branch number in our port mapping. + debug_assert!(port_desc.last_registered_index.is_some()); + ProposedBranchConstraint::PortMapping(peer_port_id, port_desc.last_registered_index.unwrap()) + } else { + // Peer port is a putter, so we want to restrict the + // solution's run to contain the branch ID we received. + ProposedBranchConstraint::BranchNumber(port_desc.last_registered_index.unwrap()) + }; + + let peer_constraints = new_solution.connector_constraints.get_mut(&peer_connector_id).unwrap(); + if !peer_constraints.contains(&new_constraint) { + peer_constraints.push(new_constraint); + } + } else { + // Constraint applies to an already visited connector + let peer_solution = new_solution.connector_mapping.get(&peer_connector_id).unwrap(); + if port_desc.num_times_fired == 0 { + let peer_mapped_id = peer_solution.port_mapping.get(&peer_port_id).unwrap(); + if peer_mapped_id.is_some() { + all_constraints_satisfied = false; + break; + } + } else if peer_port_info.is_getter { + // Peer is getter, so its port should be mapped to one + // of our branch IDs. To simplify lookup we look at the + // last message we sent to the getter. + debug_assert!(port_desc.last_registered_index.is_some()); + let peer_port = peer_solution.port_mapping.get(&peer_port_id) + .map_or(None, |v| *v); + + if port_desc.last_registered_index != peer_port { + // No match + all_constraints_satisfied = false; + break; + } + } else { + // Peer is putter, so we expect to find our port mapping + // to match one of the branch numbers in the peer + // connector's local solution + debug_assert!(port_desc.last_registered_index.is_some()); + let expected_branch_id = port_desc.last_registered_index.unwrap(); + + if !peer_solution.all_branch_ids.contains(&expected_branch_id) { + all_constraints_satisfied = false; + break; + } + } + } } - if !encountered_propositions.iter().all(|v| *v) { - // Not all of the constraints were matched - continue 'finished_branch_loop + if !all_constraints_satisfied { + // Final checks failed + continue 'branch_loop } - // All of the constraints on the branch did indeed match. + // We're sure that this branch matches the provided solution, so + // push it onto the list of considered solutions + all_solutions.push(new_solution); + } + } + + fn commit_connector_solution(&mut self, connector_id: u32, branch_id: u32) { + // Retrieve connector and branch + let connector = self.connectors.get_mut(&connector_id).unwrap(); + debug_assert_ne!(branch_id, 0); // because at 0 we have our initial backed-up non-sync branch + debug_assert!(connector.in_sync); + debug_assert!(connector.spec_branches_done.contains(&branch_id)); + + // Put the selected solution in front, the branch at index 0 is the + // "non-sync" branch. + connector.branches.swap(0, branch_id as usize); + connector.branches.truncate(1); + + // And reset the connector's state for further execution + connector.in_sync = false; + connector.spec_branches_active.clear(); + connector.spec_branches_active.push_back(0); + connector.spec_branches_pending_receive.clear(); + connector.spec_branches_done.clear(); + connector.last_checked_done = 0; + connector.global_inbox.clear(); + connector.global_outbox.clear(); + + // Do the same thing for the final selected branch + let final_branch = &mut connector.branches[0]; + final_branch.index = 0; + final_branch.parent_index = None; + debug_assert_eq!(final_branch.branch_state, BranchState::ReachedEndSync); + final_branch.branch_state = BranchState::RunningNonSync; + final_branch.message_inbox.clear(); + final_branch.port_mapping.clear(); + + // Might be that the connector was no longer running, if so, put it back + // in the list of connectors to run + if !self.connectors_active.contains(&connector_id) { + self.connectors_active.push_back(connector_id); } } fn generate_connector_id(&mut self) -> u32 { - let id = self.registry.connector_counter; - self.registry.connector_counter += 1; + let id = self.connector_counter; + self.connector_counter += 1; return id; } + // ------------------------------------------------------------------------- + // Helpers for port management + // ------------------------------------------------------------------------- + + #[inline] + fn add_owned_channel(ports: &mut HashMap, port_counter: &mut u32, owning_connector_id: Option) -> (u32, u32) { + let get_id = *port_counter; + let put_id = *port_counter + 1; + (*port_counter) += 2; + + ports.insert(get_id, PortDesc{ + id: get_id, + peer_id: put_id, + owning_connector_id, + is_getter: true, + }); + ports.insert(put_id, PortDesc{ + id: put_id, + peer_id: get_id, + owning_connector_id, + is_getter: false, + }); + + return (put_id, get_id); + } + // ------------------------------------------------------------------------- // Helpers for branch management // ------------------------------------------------------------------------- @@ -760,29 +980,42 @@ impl Runtime { debug_assert_eq!(desc.branches.len(), 1); debug_assert!(desc.spec_branches_active.is_empty()); let new_branch_index = 1; - let new_branch_identifier = desc.branch_id_counter; - desc.branch_id_counter += 1; // Push first speculative branch as active branch - let new_branch = BranchDesc::new_sync_from(new_branch_index, new_branch_identifier, &desc.branches[0]); + let new_branch = BranchDesc::new_sync_from(new_branch_index, &desc.branches[0]); desc.branches.push(new_branch); - desc.spec_branches_active.push_back(new_id); + desc.spec_branches_active.push_back(new_branch_index); desc.in_sync = true; } - /// Duplicates a particular (speculative) branch and returns its index. - fn duplicate_branch(desc: &mut ConnectorDesc, original_branch_idx: u32) -> u32 { + /// Duplicates a particular (speculative) branch and returns it. Due to + /// borrowing rules in code that uses this helper the returned branch still + /// needs to be pushed onto the member `branches`. + fn duplicate_branch(desc: &ConnectorDesc, original_branch_idx: u32) -> BranchDesc { let original_branch = &desc.branches[original_branch_idx as usize]; debug_assert!(desc.in_sync); let copied_index = desc.branches.len() as u32; - let copied_id = desc.branch_id_counter; - desc.branch_id_counter += 1; + let copied_branch = BranchDesc::new_sync_from(copied_index, original_branch); + + return copied_branch; + } - let copied_branch = BranchDesc::new_sync_from(copied_index, copied_id, original_branch); - desc.branches.push(copied_branch); + /// Retrieves all parent IDs of a particular branch. These numbers run from + /// the leaf towards the parent. + fn determine_branch_ids(&self, desc: &ConnectorDesc, first_branch_index: u32, result: &mut Vec) { + let mut next_branch_index = first_branch_index; + result.clear(); - return copied_index; + loop { + result.push(next_branch_index); + let branch = &desc.branches[next_branch_index as usize]; + + match branch.parent_index { + Some(index) => next_branch_index = index, + None => return, + } + } } } @@ -792,24 +1025,37 @@ impl Runtime { /// which the runtime modifies the appropriate variables and continues executing /// the code again. struct Context<'a> { + // Temporary references to branch related storage + inbox: &'a HashMap<(PortId, u32), ValueGroup>, + port_mapping: &'a HashMap, // Properties of currently running connector/branch connector_id: u32, branch_id: Option, + just_called_did_put: bool, // Resources ready to be retrieved by running code pending_channel: Option<(Value, Value)>, // (put, get) ports } impl<'a> crate::protocol::RunContext for Context<'a> { - fn did_put(&self, port: PortId) -> bool { - todo!() + fn did_put(&mut self, port: PortId) -> bool { + // Note that we want "did put" to return false if we have fired zero + // times, because this implies we did a prevous + return self.just_called_did_put } - fn get(&self, port: PortId) -> Option { - todo!() + fn get(&mut self, port: PortId) -> Option { + let inbox_key = (port, 1); + match self.inbox.get(&inbox_key) { + None => None, + Some(value) => Some(value.clone()), + } } - fn fires(&self, port: PortId) -> Option { - todo!() + fn fires(&mut self, port: PortId) -> Option { + match self.port_mapping.get(&port) { + None => None, + Some(port_info) => Some(Value::Bool(port_info.num_times_fired != 0)), + } } fn get_channel(&mut self) -> Option<(Value, Value)> { @@ -825,7 +1071,7 @@ fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) match value { Value::Input(port_id) | Value::Output(port_id) => { // This is an actual port - for prev_port in ports { + for prev_port in ports.iter() { if prev_port == port_id { // Already added return;