diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 91764ee546a1731534007775d44156f10b8958f7..daea21188bf15d535f184b8faa1c08a3af865ae8 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,6 +1,7 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; +use crate::runtime2::inbox::BranchMarker; use super::ConnectorId; use super::branch::BranchId; @@ -14,19 +15,20 @@ use super::scheduler::ComponentCtx; struct BranchAnnotation { port_mapping: Vec, + cur_marker: BranchMarker, } #[derive(Debug)] pub(crate) struct LocalSolution { component: ConnectorId, final_branch_id: BranchId, - port_mapping: Vec<(ChannelId, BranchId)>, + port_mapping: Vec<(ChannelId, BranchMarker)>, } #[derive(Debug, Clone)] pub(crate) struct GlobalSolution { component_branches: Vec<(ConnectorId, BranchId)>, - channel_mapping: Vec<(ChannelId, BranchId)>, // TODO: This can go, is debugging info + channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info } // ----------------------------------------------------------------------------- @@ -54,7 +56,8 @@ pub(crate) struct Consensus { // --- State that is cleared after each round // Local component's state highest_connector_id: ConnectorId, - branch_annotations: Vec, + branch_annotations: Vec, // index is branch ID + branch_markers: Vec, // index is branch marker, maps to branch // Gathered state from communication encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, @@ -76,6 +79,7 @@ impl Consensus { return Self { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), + branch_markers: Vec::new(), encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), peers: Vec::new(), @@ -116,7 +120,9 @@ impl Consensus { expected_firing: None, }) .collect(), + cur_marker: BranchMarker::new_invalid(), }); + self.branch_markers.push(BranchId::new_invalid()); self.highest_connector_id = ctx.id; @@ -129,10 +135,13 @@ impl Consensus { // index is the length in `branch_annotations`. debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; + let new_marker = BranchMarker::new(self.branch_markers.len() as u32); let new_branch_annotations = BranchAnnotation{ port_mapping: parent_branch_annotations.port_mapping.clone(), + cur_marker: new_marker, }; self.branch_annotations.push(new_branch_annotations); + self.branch_markers.push(new_branch_id); } /// Notifies the consensus algorithm that a branch has reached the end of @@ -214,7 +223,7 @@ impl Consensus { expected_mapping: source_mapping.clone(), sending_port: port.port_id, target_port: peer_port_id, - new_mapping: BranchId::new_invalid(), + new_mapping: BranchMarker::new_invalid(), }, content: DataContent::SilentPortNotification, })); @@ -223,7 +232,7 @@ impl Consensus { target_mapping.push(( channel_id, - port.registered_id.unwrap_or(BranchId::new_invalid()) + port.registered_id.unwrap_or(BranchMarker::new_invalid()) )); } @@ -290,23 +299,27 @@ impl Consensus { // Construct data header // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. - debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ expected_mapping: branch.port_mapping.clone(), sending_port: port_info.self_id, target_port: port_info.peer_id, - new_mapping: branch_id + new_mapping: branch.cur_marker, }; // Update port mapping for mapping in &mut branch.port_mapping { if mapping.port_id == source_port_id { mapping.expected_firing = Some(true); - mapping.registered_id = Some(branch_id); + mapping.registered_id = Some(branch.cur_marker); } } + // Update branch marker + let new_marker = BranchMarker::new(self.branch_markers.len() as u32); + branch.cur_marker = new_marker; + self.branch_markers.push(branch_id); + self.encountered_ports.push(source_port_id); return (self.create_sync_header(ctx), data_header); @@ -546,7 +559,7 @@ impl Consensus { #[derive(Debug)] struct MatchedLocalSolution { final_branch_id: BranchId, - channel_mapping: Vec<(ChannelId, BranchId)>, + channel_mapping: Vec<(ChannelId, BranchMarker)>, matches: Vec, }