diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 828dbe3f6c374a16fc5bbb171365482950c1b05b..bd8510e2c1bd789afe73cebd37e385d9acf2b32a 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,18 +1,15 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; -use crate::runtime2::inbox::BranchMarker; -use crate::runtime2::scheduler::ComponentPortChange; use super::ConnectorId; use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ - Message, ChannelAnnotation, - DataMessage, DataContent, DataHeader, + Message, ChannelAnnotation, BranchMarker, DataMessage, DataHeader, SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncHeader, }; -use super::scheduler::ComponentCtx; +use super::scheduler::{ComponentCtx, ComponentPortChange}; struct BranchAnnotation { channel_mapping: Vec, @@ -265,19 +262,15 @@ impl Consensus { let channel_id = port_desc.channel_id; if !self.encountered_ports.contains(&self_port_id) { - ctx.submit_message(Message::Data(DataMessage { + ctx.submit_message(Message::SyncPort(SyncPortMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, highest_component_id: self.highest_connector_id, sync_round: self.sync_round }, - data_header: DataHeader{ - expected_mapping: source_mapping.clone(), - sending_port: self_port_id, - target_port: peer_port_id, - new_mapping: BranchMarker::new_invalid(), - }, - content: DataContent::SilentPortNotification, + source_port: self_port_id, + target_port: peer_port_id, + content: SyncPortContent::SilentPortNotification, })); self.encountered_ports.push(self_port_id); } @@ -439,6 +432,11 @@ impl Consensus { debug_assert!(self.is_in_sync()); debug_assert!(ctx.get_port_by_id(message.target_port).is_some()); match message.content { + SyncPortContent::SilentPortNotification => { + // The point here is to let us become part of the sync round and + // take note of the leader in case all of our ports are silent. + self.encountered_ports.push(message.target_port); + } SyncPortContent::NotificationWave => { // Wave to discover everyone in the network, handling sync // header takes care of leader discovery, here we need to make @@ -483,7 +481,7 @@ impl Consensus { // Check for sent ports debug_assert!(self.workspace_ports.is_empty()); - find_ports_in_value_group(message.content.as_message().unwrap(), &mut self.workspace_ports); + find_ports_in_value_group(&message.content, &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle received ports"); self.workspace_ports.clear(); @@ -507,11 +505,6 @@ impl Consensus { } } - if let DataContent::SilentPortNotification = message.content { - // No port can receive a "silent" notification. - return false; - } - let annotation = &self.branch_annotations[branch_id.index as usize]; for expected in &message.data_header.expected_mapping { // If we own the port, then we have an entry in the