diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index d0c833ceb22ef431c45ac02b134bac28b70bd53a..c3a2947ea86c1ed9c9e458826ca585c236ca9d1d 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,15 +1,16 @@ use crate::collections::VecSet; + use crate::protocol::eval::ValueGroup; -use crate::runtime2::inbox2::DataContent; use super::branch::{BranchId, ExecTree, QueueKind}; use super::ConnectorId; -use super::port::{ChannelId, Port, PortIdLocal}; -use super::inbox2::{ - DataHeader, DataMessageFancy, MessageFancy, - SyncContent, SyncHeader, SyncMessageFancy, PortAnnotation +use super::port::{ChannelId, PortIdLocal}; +use super::inbox::{ + Message, PortAnnotation, + DataMessage, DataContent, DataHeader, + SyncMessage, SyncContent, SyncHeader, }; -use super::scheduler::ComponentCtxFancy; +use super::scheduler::ComponentCtx; struct BranchAnnotation { port_mapping: Vec, @@ -94,7 +95,7 @@ impl Consensus { /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. - pub fn start_sync(&mut self, ctx: &ComponentCtxFancy) { + pub fn start_sync(&mut self, ctx: &ComponentCtx) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); debug_assert!(self.last_finished_handled.is_none()); @@ -183,7 +184,7 @@ impl Consensus { /// Generates sync messages for any branches that are at the end of the /// sync block. To find these branches, they should've been put in the /// "finished" queue in the execution tree. - pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtxFancy) -> Option { + pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtx) -> Option { debug_assert!(self.is_in_sync()); let mut last_branch_id = self.last_finished_handled; @@ -201,7 +202,7 @@ impl Consensus { let channel_id = port_desc.channel_id; if !self.encountered_ports.contains(&port.port_id) { - ctx.submit_message(MessageFancy::Data(DataMessageFancy{ + ctx.submit_message(Message::Data(DataMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, @@ -265,7 +266,7 @@ impl Consensus { /// Prepares a message for sending. Caller should have made sure that /// sending the message is consistent with the speculative state. - pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) { + pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); let branch = &mut self.branch_annotations[branch_id.index as usize]; @@ -317,7 +318,7 @@ impl Consensus { /// `branch_can_receive` function. /// 2. We return the branches that *can* receive the message, you still /// have to explicitly call `notify_of_received_message`. - pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessageFancy, ctx: &mut ComponentCtxFancy, target_ids: &mut Vec) { + pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) { self.handle_received_data_header(exec_tree, &message.data_header, &message.content, target_ids); self.handle_received_sync_header(&message.sync_header, ctx); } @@ -325,7 +326,7 @@ impl Consensus { /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. - pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) -> Option { + pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option { self.handle_received_sync_header(&message.sync_header, ctx); // And handle the contents @@ -419,7 +420,7 @@ impl Consensus { } } - fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { + fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves self.encountered_peers.push(sync_header.sending_component_id); @@ -434,12 +435,12 @@ impl Consensus { continue } - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: *encountered_id, content: SyncContent::Notification, }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); } // But also send our locally combined solution @@ -447,16 +448,16 @@ impl Consensus { } else if sync_header.highest_component_id < self.highest_connector_id { // Sender has lower leader ID, so it should know about our higher // one. - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: sync_header.sending_component_id, content: SyncContent::Notification }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); } // else: exactly equal, so do nothing } - fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtxFancy) -> Option { + fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option { println!("DEBUG [....:.. conn:{:02}]: Storing local solution for component {}, branch {}", ctx.id.0, solution.component.0, solution.final_branch_id.index); if self.highest_connector_id == ctx.id { @@ -470,12 +471,12 @@ impl Consensus { continue; } - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: connector_id, content: SyncContent::GlobalSolution(global_solution.clone()), }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); } debug_assert!(my_final_branch_id.is_valid()); @@ -485,34 +486,34 @@ impl Consensus { } } else { // Someone else is the leader - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, content: SyncContent::LocalSolution(solution), }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); return None; } } #[inline] - fn create_sync_header(&self, ctx: &ComponentCtxFancy) -> SyncHeader { + fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader { return SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, } } - fn forward_local_solutions(&mut self, ctx: &mut ComponentCtxFancy) { + fn forward_local_solutions(&mut self, ctx: &mut ComponentCtx) { debug_assert_ne!(self.highest_connector_id, ctx.id); for local_solution in self.solution_combiner.drain() { - let message = SyncMessageFancy{ + let message = SyncMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, content: SyncContent::LocalSolution(local_solution), }; - ctx.submit_message(MessageFancy::Sync(message)); + ctx.submit_message(Message::Sync(message)); } } }