diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 4e2be12a01bfddf3d22459770d76da2f91bfe2a1..d767ac3b32f5ba86419cf6db6ed055335bd16ca1 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -3,7 +3,7 @@ use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; use crate::runtime2::ConnectorId; use crate::runtime2::inbox2::{DataHeader, SyncHeader}; -use crate::runtime2::port::PortIdLocal; +use crate::runtime2::port::{Port, PortIdLocal}; use crate::runtime2::scheduler::ComponentCtxFancy; use super::inbox2::PortAnnotation; @@ -17,9 +17,12 @@ struct BranchAnnotation { /// /// The type itself serves as an experiment to see how code should be organized. // TODO: Flatten all datastructures +// TODO: Have a "branch+port position hint" in case multiple operations are +// performed on the same port to prevent repeated lookups pub(crate) struct Consensus { highest_connector_id: ConnectorId, branch_annotations: Vec, + workspace_ports: Vec, } #[derive(Clone, Copy, PartialEq, Eq)] @@ -38,10 +41,15 @@ impl Consensus { // --- Controlling sync round and branches + /// Returns whether the consensus algorithm is running in sync mode + pub fn is_in_sync(&self) -> bool { + return !self.branch_annotations.is_empty(); + } + /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. - pub fn start_sync(&mut self, ports: &[PortIdLocal]) { + pub fn start_sync(&mut self, ports: &[Port]) { debug_assert!(self.branch_annotations.is_empty()); debug_assert!(!self.highest_connector_id.is_valid()); @@ -50,7 +58,7 @@ impl Consensus { self.branch_annotations.push(BranchAnnotation{ port_mapping: ports.iter() .map(|v| PortAnnotation{ - port_id: *v, + port_id: v.self_id, registered_id: None, expected_firing: None, }) @@ -127,16 +135,48 @@ impl Consensus { /// Prepares a message for sending. Caller should have made sure that /// sending the message is consistent with the speculative state. - pub fn prepare_message(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, value: &ValueGroup) -> (SyncHeader, DataHeader) { + pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) { + debug_assert!(self.is_in_sync()); + let branch = &mut self.branch_annotations[branch_id.index as usize]; + if cfg!(debug_assertions) { - let branch = &self.branch_annotations[branch_id.index as usize]; let port = branch.port_mapping.iter() .find(|v| v.port_id == source_port_id) .unwrap(); debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); } - + // Check for ports that are begin sent + debug_assert!(self.workspace_ports.is_empty()); + find_ports_in_value_group(content, &mut self.workspace_ports); + if !self.workspace_ports.is_empty() { + todo!("handle sending ports"); + self.workspace_ports.clear(); + } + + let sync_header = SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + }; + + // TODO: Handle multiple firings. Right now we just assign the current + // branch to the `None` value because we know we can only send once. + debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); + let port_info = ctx.get_port_by_id(source_port_id).unwrap(); + let data_header = DataHeader{ + expected_mapping: branch.port_mapping.clone(), + target_port: port_info.peer_id, + new_mapping: branch_id + }; + + for mapping in &mut branch.port_mapping { + if mapping.port_id == source_port_id { + mapping.expected_firing = Some(true); + mapping.registered_id = Some(branch_id); + } + } + + return (sync_header, data_header); } pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { @@ -161,12 +201,22 @@ impl Consensus { } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader) { + pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) { debug_assert!(self.branch_can_receive(branch_id, data_header)); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { if mapping.port_id == data_header.target_port { + // Found the port in which the message should be inserted mapping.registered_id = Some(data_header.new_mapping); + + // Check for sent ports + debug_assert!(self.workspace_ports.is_empty()); + find_ports_in_value_group(content, &mut self.workspace_ports); + if !self.workspace_ports.is_empty() { + todo!("handle received ports"); + self.workspace_ports.clear(); + } + return; } } @@ -196,4 +246,47 @@ impl Consensus { return true; } +} + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { + // Helper to check a value for a port and recurse if needed. + use crate::protocol::eval::Value; + + fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + let cur_port = PortIdLocal::new(port_id.0.u32_suffix); + for prev_port in ports.iter() { + if *prev_port == cur_port { + // Already added + return; + } + } + + ports.push(cur_port); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for embedded_value in heap_region { + find_port_in_value(group, embedded_value, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for value in &value_group.values { + find_port_in_value(value_group, value, ports); + } } \ No newline at end of file