diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 1affea56d580a3dcf628f13a2f71142a82e4525c..d0c833ceb22ef431c45ac02b134bac28b70bd53a 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,5 +1,6 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; +use crate::runtime2::inbox2::DataContent; use super::branch::{BranchId, ExecTree, QueueKind}; use super::ConnectorId; @@ -42,15 +43,18 @@ pub(crate) struct GlobalSolution { // TODO: A lot of stuff should be batched. Like checking all the sync headers // and sending "I have a higher ID" messages. pub(crate) struct Consensus { + // --- State that is cleared after each round // Local component's state highest_connector_id: ConnectorId, branch_annotations: Vec, last_finished_handled: Option, - // Gathered state (in case we are currently the leader of the distributed - // consensus protocol) - encountered_peers: VecSet, + // Gathered state from communication + encountered_peers: VecSet, // to determine when we should send "found a higher ID" messages. + encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, - // Workspaces + // --- Persistent state + // TODO: Tracking sync round numbers + // --- Workspaces workspace_ports: Vec, } @@ -67,6 +71,7 @@ impl Consensus { branch_annotations: Vec::new(), last_finished_handled: None, encountered_peers: VecSet::new(), + encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), workspace_ports: Vec::new(), } @@ -188,9 +193,32 @@ impl Consensus { let mut target_mapping = Vec::with_capacity(source_mapping.len()); for port in source_mapping { + // Note: if the port is silent, and we've never communicated + // over the port, then we need to do so now, to let the peer + // component know about our sync leader state. let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); + let peer_port_id = port_desc.peer_id; + let channel_id = port_desc.channel_id; + + if !self.encountered_ports.contains(&port.port_id) { + ctx.submit_message(MessageFancy::Data(DataMessageFancy{ + sync_header: SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + }, + data_header: DataHeader{ + expected_mapping: source_mapping.clone(), + sending_port: port.port_id, + target_port: peer_port_id, + new_mapping: BranchId::new_invalid(), + }, + content: DataContent::SilentPortNotification, + })); + self.encountered_ports.push(port.port_id); + } + target_mapping.push(( - port_desc.channel_id, + channel_id, port.registered_id.unwrap_or(BranchId::new_invalid()) )); } @@ -229,6 +257,7 @@ impl Consensus { self.branch_annotations.clear(); self.last_finished_handled = None; self.encountered_peers.clear(); + self.encountered_ports.clear(); self.solution_combiner.clear(); } @@ -238,11 +267,10 @@ impl Consensus { /// sending the message is consistent with the speculative state. pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); - let sync_header = self.create_sync_header(ctx); - let branch = &mut self.branch_annotations[branch_id.index as usize]; if cfg!(debug_assertions) { + // Check for consistent mapping let port = branch.port_mapping.iter() .find(|v| v.port_id == source_port_id) .unwrap(); @@ -257,17 +285,19 @@ impl Consensus { self.workspace_ports.clear(); } + // Construct data header // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), - sending_port: port_info.peer_id, + sending_port: port_info.self_id, target_port: port_info.peer_id, new_mapping: branch_id }; + // Update port mapping for mapping in &mut branch.port_mapping { if mapping.port_id == source_port_id { mapping.expected_firing = Some(true); @@ -275,7 +305,9 @@ impl Consensus { } } - return (sync_header, data_header); + self.encountered_ports.push(source_port_id); + + return (self.create_sync_header(ctx), data_header); } /// Handles a new data message by handling the data and sync header, and @@ -286,7 +318,7 @@ impl Consensus { /// 2. We return the branches that *can* receive the message, you still /// have to explicitly call `notify_of_received_message`. pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessageFancy, ctx: &mut ComponentCtxFancy, target_ids: &mut Vec) { - self.handle_received_data_header(exec_tree, &message.data_header, target_ids); + self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids); self.handle_received_sync_header(&message.sync_header, ctx); } @@ -318,8 +350,9 @@ impl Consensus { } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) { - debug_assert!(self.branch_can_receive(branch_id, data_header)); + pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) { + debug_assert!(self.branch_can_receive(branch_id, data_header, content)); + let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { if mapping.port_id == data_header.target_port { @@ -328,7 +361,7 @@ impl Consensus { // Check for sent ports debug_assert!(self.workspace_ports.is_empty()); - find_ports_in_value_group(content, &mut self.workspace_ports); + find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle received ports"); self.workspace_ports.clear(); @@ -345,7 +378,12 @@ impl Consensus { /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. - pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { + pub fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader, content: &DataContent) -> bool { + if let DataContent::SilentPortNotification = content { + // No port can receive a "silent" notification. + return false; + } + let annotation = &self.branch_annotations[branch_id.index as usize]; for expected in &data_header.expected_mapping { // If we own the port, then we have an entry in the @@ -369,12 +407,12 @@ impl Consensus { /// Checks data header and consults the stored port mapping and the /// execution tree to see which branches may receive the data message's /// contents. - fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { + fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { if branch.awaiting_port == data_header.target_port { // Found a branch awaiting the message, but we need to make sure // the mapping is correct - if self.branch_can_receive(branch.id, data_header) { + if self.branch_can_receive(branch.id, data_header, content) { target_ids.push(branch.id); } } @@ -419,6 +457,8 @@ impl Consensus { } fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) -> Option { + println!("DEBUG [....:.. conn:{:02}]: Storing local solution for component {}, branch {}", ctx.id.0, solution.component.0, solution.final_branch_id.index); + if self.highest_connector_id == ctx.id { // We are the leader if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) {