From 0d5a89aea247809ac83b882f35d9f7834daa3338 2021-09-13 12:22:34 From: MH Date: 2021-09-13 12:22:34 Subject: [PATCH] halfway shared-memory new consensus algorithm --- diff --git a/src/protocol/eval/value.rs b/src/protocol/eval/value.rs index fb743f3b77cc324198f44ab08f9cee141593f724..6d6a618e386fa47a56189146f6ebd208a4608d7f 100644 --- a/src/protocol/eval/value.rs +++ b/src/protocol/eval/value.rs @@ -160,6 +160,7 @@ impl Value { /// /// Again: this is a temporary thing, hopefully removed once we move to a /// bytecode interpreter. +#[derive(Clone)] pub struct ValueGroup { pub(crate) values: Vec, pub(crate) regions: Vec> diff --git a/src/runtime2/messages.rs b/src/runtime2/messages.rs new file mode 100644 index 0000000000000000000000000000000000000000..1d29934465b34add25097b85e6a760806cbf87da --- /dev/null +++ b/src/runtime2/messages.rs @@ -0,0 +1,137 @@ +use std::collections::HashMap; +use std::collections::hash_map::Entry; + +use crate::PortId; +use crate::common::Id; +use crate::protocol::*; +use crate::protocol::eval::*; + +/// A message residing in a connector's inbox (waiting to be put into some kind +/// of speculative branch), or a message waiting to be sent. +#[derive(Clone)] +pub struct BufferedMessage { + pub(crate) sending_port: PortId, + pub(crate) receiving_port: PortId, + pub(crate) peer_prev_branch_id: Option, + pub(crate) peer_cur_branch_id: u32, + pub(crate) message: ValueGroup, +} + +/// An action performed on a port. Unsure about this +#[derive(PartialEq, Eq, Hash)] +struct PortAction { + port_id: u32, + prev_branch_id: Option, +} + +/// A connector's global inbox. Any received message ends up here. This is +/// because a message might be received before a branch arrives at the +/// corresponding `get()` that is supposed to receive that message. Hence we +/// need to store it for all future branches that might be able to receive it. +pub struct ConnectorInbox { + // TODO: @optimize, HashMap + Vec is a bit stupid. + messages: HashMap> +} + +impl ConnectorInbox { + pub fn new() -> Self { + Self { + messages: HashMap::new(), + } + } + + /// Inserts a new message into the inbox. + pub fn insert_message(&mut self, message: BufferedMessage) { + // TODO: @error - Messages are received from actors we generally cannot + // trust, and may be unreliable, so messages may be received multiple + // times or have spoofed branch IDs. Debug asserts are present for the + // initial implementation. + + // If it is the first message on the port, then we cannot possible have + // a previous port mapping on that port. + let port_action = PortAction{ + port_id: message.sending_port.0.u32_suffix, + prev_branch_id: message.peer_prev_branch_id, + }; + + match self.messages.entry(port_action) { + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + debug_assert!( + entry.iter() + .find(|v| v.peer_cur_branch_id == message.peer_cur_branch_id) + .is_none(), + "inbox already contains sent message (same new branch ID)" + ); + + entry.push(message); + }, + Entry::Vacant(entry) => { + entry.insert(vec![message]); + } + } + } + + /// Checks if the provided port (and the branch id mapped to that port) + /// correspond to any messages in the inbox. + pub fn find_matching_message(&self, port_id: u32, prev_branch_id_at_port: Option) -> Option<&[BufferedMessage]> { + let port_action = PortAction{ + port_id, + prev_branch_id: prev_branch_id_at_port, + }; + + match self.messages.get(&port_action) { + Some(messages) => return Some(messages.as_slice()), + None => return None, + } + } +} + +/// A connector's outbox. A temporary storage for messages that are sent by +/// branches performing `put`s until we're done running all branches and can +/// actually transmit the messages. +pub struct ConnectorOutbox { + messages: Vec, + sent_counter: usize, +} + +impl ConnectorOutbox { + pub fn new() -> Self { + Self{ + messages: Vec::new(), + sent_counter: 0, + } + } + + pub fn insert_message(&mut self, message: BufferedMessage) { + // TODO: @error - Depending on the way we implement the runtime in the + // future we might end up not trusting "our own code" (i.e. in case + // the connectors we are running are described by foreign code) + debug_assert!( + self.messages.iter() + .find(|v| + v.sending_port == message.sending_port && + v.peer_prev_branch_id == message.peer_prev_branch_id + ) + .is_none(), + "messages was already registered for sending" + ); + + self.messages.push(message); + } + + pub fn take_next_message_to_send(&mut self) -> Option<&BufferedMessage> { + if self.sent_counter == self.messages.len() { + return None; + } + + let cur_index = self.sent_counter; + self.sent_counter += 1; + return Some(&self.messages[cur_index]); + } + + pub fn clear(&mut self) { + self.messages.clear(); + self.sent_counter = 0; + } +} \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index a6d9e0e02d16010e5298d0bef7979c9c14733448..51d48d1953e4f225196c577dc7637fa976451ed5 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -1 +1,2 @@ -mod runtime; \ No newline at end of file +mod runtime; +mod messages; \ No newline at end of file diff --git a/src/runtime2/runtime.rs b/src/runtime2/runtime.rs index 8c73481041f32674f8505035dc4dfa306d7519ca..81b976c34f313ec9e6852da87a32b261fb042b02 100644 --- a/src/runtime2/runtime.rs +++ b/src/runtime2/runtime.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::hash_map::{Entry}; use crate::{Polarity, PortId}; @@ -8,6 +8,7 @@ use crate::protocol::*; use crate::protocol::eval::*; use super::registry::Registry; +use super::messages::*; enum AddComponentError { ModuleDoesNotExist, @@ -22,26 +23,17 @@ struct PortDesc { is_getter: bool, // otherwise one can only call `put` } -// Message received from some kind of peer -struct BufferedMessage { - // If in inbox, then sender is the connector's peer. If in the outbox, then - // the sender is the connector itself. - sending_port: PortId, - receiving_port: PortId, - peer_prev_branch_id: Option, // of the sender - peer_cur_branch_id: u32, // of the sender - message: ValueGroup, -} - struct ConnectorDesc { id: u32, in_sync: bool, branches: Vec, // first one is always non-speculative one branch_id_counter: u32, spec_branches_active: VecDeque, // branches that can be run immediately - spec_branches_pending_receive: HashMap, // from port_id to branch index - global_inbox: HashMap<(PortId, u32), BufferedMessage>, - global_outbox: HashMap<(PortId, u32), BufferedMessage>, + spec_branches_pending_receive: HashMap>, // from port_id to branch index + spec_branches_done: Vec, + last_checked_done: u32, + global_inbox: ConnectorInbox, + global_outbox: ConnectorOutbox, } impl ConnectorDesc { @@ -58,8 +50,10 @@ impl ConnectorDesc { branch_id_counter: 1, spec_branches_active: branches_active, spec_branches_pending_receive: HashMap::new(), - global_inbox: HashMap::new(), - global_outbox: HashMap::new(), + spec_branches_done: Vec::new(), + last_checked_done: 0, + global_inbox: ConnectorInbox::new(), + global_outbox: ConnectorOutbox::new(), } } } @@ -166,6 +160,25 @@ impl Registry { } } +#[derive(Clone, Copy, Eq, PartialEq)] +enum ProposedBranchConstraint { + SilentPort(u32), // port id + BranchNumber(u32), // branch id +} + +// Local solution of the connector +struct ProposedConnectorSolution { + final_branch_id: u32, + all_branch_ids: Vec, // the final branch ID and, recursively, all parents + silent_ports: Vec, // port IDs of the connector itself +} + +struct ProposedSolution { + connector_mapping: HashMap, // from connector ID to branch ID + connector_propositions: HashMap>, // from connector ID to encountered branch numbers + remaining_connectors: Vec, // connectors that still need to be visited +} + // TODO: @performance, use freelists+ids instead of HashMaps struct Runtime { protocol: Arc, @@ -197,7 +210,6 @@ impl Runtime { use AddComponentError as ACE; use crate::runtime::error::AddComponentError as OldACE; - // TODO: Allow the ValueGroup to contain any kind of value // TODO: Remove the responsibility of adding a component from the PD // Lookup module and the component @@ -252,8 +264,16 @@ impl Runtime { pub fn run(&mut self) { // Go through all active connectors while !self.connectors_active.is_empty() { + // Run a single connector let next_id = self.connectors_active.pop_front().unwrap(); - self.run_connector(next_id); + let run_again = self.run_connector(next_id); + + if run_again { + self.connectors_active.push_back(next_id); + } + + self.empty_connector_outbox(next_id); + self.check_connector_solution(next_id); } } @@ -269,7 +289,7 @@ impl Runtime { pending_channel: None, }; - let mut call_again = false; + let mut call_again = false; // TODO: Come back to this, silly pattern while call_again { call_again = false; // bit of a silly pattern, maybe revise @@ -321,6 +341,8 @@ impl Runtime { // First check if a port value is assigned to the // current branch. If so, check if it is consistent. debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); + let mut insert_in_pending_receive = false; + match branch.port_mapping.entry(port_id) { Entry::Vacant(entry) => { // No entry yet, so force to firing @@ -329,7 +351,7 @@ impl Runtime { num_times_fired: 1, }); branch.branch_state = BranchState::BranchPoint; - desc.spec_branches_pending_receive.insert(port_id, branch_index); + insert_in_pending_receive = true; }, Entry::Occupied(entry) => { // Have an entry, check if it is consistent @@ -339,19 +361,124 @@ impl Runtime { branch.branch_state = BranchState::Failed; } else { // Perfectly fine, add to queue + debug_assert!(entry.last_registered_identifier.is_none()); + assert_eq!(entry.num_times_fired, 1, "temp: keeping fires() for now"); branch.branch_state = BranchState::BranchPoint; - desc.spec_branches_pending_receive.insert(port_id, branch_index); + insert_in_pending_receive = true; + } + } + } + + if insert_in_pending_receive { + // Perform the insert + match desc.spec_branches_pending_receive.entry(port_id) { + Entry::Vacant(entry) => { + entry.insert(vec![branch_index]); + } + Entry::Occupied(mut entry) => { + let entry = entry.get_mut(); + debug_assert!(!entry.contains(&branch_index)); + entry.push(branch_index); + } + } + + // But also check immediately if we don't have a + // previously received message. If so, we + // immediately branch and accept the message + if let Some(messages) = desc.global_inbox.find_matching_message(port_id.0.u32_suffix, None) { + for message in messages { + let new_branch_idx = Self::duplicate_branch(desc, branch_index); + let new_branch = &mut desc.branches[new_branch_idx as usize]; + let new_port_desc = new_branch.port_mapping.get_mut(&port_id).unwrap(); + new_port_desc.last_registered_identifier = Some(message.peer_cur_branch_id); + new_branch.message_inbox.insert((port_id, 1), message.message.clone()); + + desc.spec_branches_active.push_back(new_branch_idx); } } } }, RunResult::BranchAtSyncEnd => { + // Check the branch for any ports that were not used and + // insert them in the port mapping as not having fired. + for port_index in branch.owned_ports { + let port_id = PortId(Id{ connector_id: desc.id, u32_suffix: port_index }); + if let Entry::Vacant(entry) = branch.port_mapping.entry(port_id) { + entry.insert(BranchPortDesc { + last_registered_identifier: None, + num_times_fired: 0 + }); + } + } + + // Mark the branch as being done branch.branch_state = BranchState::ReachedEndSync; - todo!("somehow propose solution"); + desc.spec_branches_done.push(branch_index); }, RunResult::BranchPut(port_id, value_group) => { debug_assert!(branch.owned_ports.contains(&port_id.0.u32_suffix)); - debug_assert_eq!(value_group.values.len(), 1) + debug_assert_eq!(value_group.values.len(), 1); // can only send one value + + // Branch just performed a `put()`. Check if we have + // assigned the port value and if so, if it is + // consistent. + let mut can_put = true; + match branch.port_mapping.entry(port_id) { + Entry::Vacant(entry) => { + // No entry yet + entry.insert(BranchPortDesc{ + last_registered_identifier: Some(branch.identifier), + num_times_fired: 1, + }); + }, + Entry::Occupied(mut entry) => { + // Pre-existing entry + let entry = entry.get_mut(); + if entry.num_times_fired == 0 { + // This is 'fine' in the sense that we have + // a normal inconsistency in the branch. + branch.branch_state = BranchState::Failed; + can_put = false; + } else if entry.last_registered_identifier.is_none() { + // A put() that follows a fires() + entry.last_registered_identifier = Some(branch.identifier); + } else { + // This should be fine in the future. But + // for now we throw an error as it doesn't + // mesh well with the 'fires()' concept. + todo!("throw an error of some sort, then fail all related") + } + } + } + + if can_put { + // Actually put the message in the outbox + let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap(); + let peer_id = port_desc.peer_id; + let peer_desc = self.registry.ports.get(&peer_id).unwrap(); + debug_assert!(peer_desc.owning_connector_id.is_some()); + + let peer_id = PortId(Id{ + connector_id: peer_desc.owning_connector_id.unwrap(), + u32_suffix: peer_id + }); + + // For now this is the one and only time we're going + // to send a message. So for now we can't send a + // branch ID. + desc.global_outbox.insert((port_id, 1), BufferedMessage{ + sending_port: port_id, + receiving_port: peer_id, + peer_prev_branch_id: None, + peer_cur_branch_id: 0, + message: value_group, + }); + + // Finally, because we were able to put the message, + // we can run the branch again + desc.spec_branches_active.push_back(branch_index); + call_again = true; + } }, _ => unreachable!("got result '{:?}' from running component in sync mode", run_result), } @@ -408,6 +535,213 @@ impl Runtime { return true; } + /// Puts all the messages that are currently in the outbox of a particular + /// connector into the inbox of the receivers. If possible then branches + /// will be created that receive those messages. + fn empty_connector_outbox(&mut self, connector_index: u32) { + let connector = self.registry.connectors.get_mut(&connector_index).unwrap(); + while let Some(message_to_send) = connector.global_outbox.take_next_message_to_send() { + // Lookup the target connector + let port_desc = self.registry.ports.get(&target_port.0.u32_suffix).unwrap(); + debug_assert_eq!(port_desc.owning_connector_id.unwrap(), target_port.0.connector_id); + let target_connector_id = port_desc.owning_connector_id.unwrap(); + let target_connector = self.registry.connectors.get_mut(&target_connector_id).unwrap(); + + // In any case, always put the message in the global inbox + target_connector.global_inbox.insert_message(message_to_send.clone()); + + // Check if there are any branches that are waiting on + // receives + if let Some(branch_indices) = target_connector.spec_branches_pending_receive.get(&target_port) { + // Check each of the branches for a port mapping that + // matches the one on the message header + for branch_index in branch_indices { + let branch = &mut target_connector.branches[*branch_index as usize]; + debug_assert_eq!(branch.branch_state, BranchState::BranchPoint); + + let mut can_branch = false; + + if let Some(port_desc) = branch.port_mapping.get(&message_to_send.receiving_port) { + if port_desc.last_registered_identifier == message_to_send.peer_prev_branch_id && port_desc.num_times_fired == 1 { + can_branch = true; + } + } + + if can_branch { + // Put the message inside a clone of the currently + // waiting branch + let new_branch_idx = Self::duplicate_branch(target_connector, *branch_index); + let new_branch = &mut target_connector.branches[new_branch_idx as usize]; + let new_port_desc = &mut new_branch.port_mapping.get_mut(&message_to_send.receiving_port).unwrap(); + new_port_desc.last_registered_identifier = Some(message_to_send.peer_cur_branch_id); + new_branch.message_inbox.insert((message_to_send.receiving_port, 1), message_to_send.message.clone()); + + // And queue the branch for further execution + target_connector.spec_branches_active.push(new_branch_idx); + if !self.connectors_active.contains(&target_connector.id) { + self.connectors_active.push_back(target_connector.id); + } + } + } + } + } + } + + /// Checks a connector for the submitted solutions. After all neighbouring + /// connectors have been checked all of their "last checked solution" index + /// will be incremented. + fn check_connector_new_solutions(&mut self, connector_index: u32) { + // Take connector and start processing its solutions + let connector = self.registry.connectors.get_mut(&connector_index).unwrap(); + let mut considered_connectors = HashSet::new(); + let mut valid_solutions = Vec::new(); + + while connector.last_checked_done != connector.spec_branches_done.len() as u32 { + // We have a new solution to consider + let start_branch_index = connector.spec_branches_done[connector.last_checked_done as usize]; + connector.last_checked_done += 1; + + let branch = &connector.branches[start_branch_index as usize]; + debug_assert_eq!(branch.branch_state, BranchState::ReachedEndSync); + + // Clear storage for potential solutions + considered_connectors.clear(); + + // Start seeking solution among other connectors within the same + // synchronous region + considered_connectors.insert(connector.id); + for port in branch.port_ + } + } + + fn check_connector_solution(&self, first_connector_index: u32, first_branch_index: u32) { + // Take the connector and branch of interest + let first_connector = self.registry.connectors.get(&first_connector_index).unwrap(); + let first_branch = &first_connector.branches[first_branch_index as usize]; + debug_assert_eq!(first_branch.branch_state, BranchState::ReachedEndSync); + + // Setup the first solution + let mut first_solution = ProposedSolution{ + connector_mapping: HashMap::new(), + connector_propositions: HashMap::new(), + remaining_connectors: Vec::new(), + }; + first_solution.connector_mapping.insert(first_connector.id, first_branch.identifier); + for (port_id, port_mapping) in first_branch.port_mapping.iter() { + let port_desc = self.registry.ports.get(&port_id.0.u32_suffix).unwrap(); + let peer_port_id = port_desc.peer_id; + let peer_port_desc = self.registry.ports.get(&peer_port_id).unwrap(); + let peer_connector_id = peer_port_desc.owning_connector_id.unwrap(); + + let constraint = match port_mapping.last_registered_identifier { + Some(branch_id) => ProposedBranchConstraint::BranchNumber(branch_id), + None => ProposedBranchConstraint::SilentPort(peer_port_id), + }; + + match first_solution.connector_propositions.entry(peer_connector_id) { + Entry::Vacant(entry) => { + // Not yet encountered + entry.insert(vec![constraint]); + first_solution.remaining_connectors.push(peer_connector_id); + }, + Entry::Occupied(mut entry) => { + // Already encountered + let entry = entry.get_mut(); + if !entry.contains(&constraint) { + entry.push(constraint); + } + } + } + } + + // Setup storage for all possible solutions + let mut all_solutions = Vec::new(); + all_solutions.push(first_solution); + + while !all_solutions.is_empty() { + let mut cur_solution = all_solutions.pop().unwrap(); + + } + } + + fn merge_solution_with_connector(&self, cur_solution: &mut ProposedSolution, all_solutions: &mut Vec, target_connector: u32) { + debug_assert!(!cur_solution.connector_mapping.contains_key(&target_connector)); // not yet visited + debug_assert!(cur_solution.connector_propositions.contains_key(&target_connector)); // but we encountered a reference to it + + let branch_propositions = cur_solution.connector_propositions.get(&target_connector).unwrap(); + let cur_connector = self.registry.connectors.get(&target_connector).unwrap(); + + // Make sure all propositions are unique + for i in 0..branch_propositions.len() { + let proposition_i = branch_propositions[i]; + for j in 0..i { + let proposition_j = branch_propositions[j]; + debug_assert_ne!(proposition_i, proposition_j); + } + } + + // Check connector for compatible branches + let mut considered_branches = Vec::with_capacity(cur_connector.spec_branches_done.len()); + let mut encountered_propositions = Vec::new(); + + 'finished_branch_loop: for branch_idx in cur_connector.spec_branches_done { + // Reset the propositions matching variables + encountered_propositions.clear(); + encountered_propositions.resize(branch_propositions.len(), false); + + // First check the silent port propositions + let cur_branch = &cur_connector.branches[branch_idx as usize]; + for (proposition_idx, proposition) in branch_propositions.iter().enumerate() { + match proposition { + ProposedBranchConstraint::SilentPort(port_id) => { + let old_school_port_id = PortId(Id{ connector_id: cur_connector.id, u32_suffix: *port_id }); + let port_mapping = cur_branch.port_mapping.get(&old_school_port_id).unwrap(); + if port_mapping.num_times_fired != 0 { + // Port did fire, so the current branch is not + // compatible + continue 'finished_branch_loop; + } + + // Otherwise, the port was silent indeed + encountered_propositions[proposition_idx] = true; + }, + ProposedBranchConstraint::BranchNumber(_) => {}, + } + } + + // Then check the branch number propositions + let mut parent_branch_idx = branch_idx; + loop { + let branch = &cur_connector.branches[parent_branch_idx as usize]; + for proposition_idx in 0..branch_propositions.len() { + let proposition = branch_propositions[proposition_idx]; + match proposition { + ProposedBranchConstraint::SilentPort(_) => {}, + ProposedBranchConstraint::BranchNumber(branch_number) => { + if branch_number == branch.identifier { + encountered_propositions[proposition_idx] = true; + } + } + } + } + + if branch.parent_index.is_none() { + // No more parents + break; + } + + parent_branch_idx = branch.parent_index.unwrap(); + } + + if !encountered_propositions.iter().all(|v| *v) { + // Not all of the constraints were matched + continue 'finished_branch_loop + } + + // All of the constraints on the branch did indeed match. + } + } + fn generate_connector_id(&mut self) -> u32 { let id = self.registry.connector_counter; self.registry.connector_counter += 1;