diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index bd8510e2c1bd789afe73cebd37e385d9acf2b32a..8e5e985f91578aa95625780c1bd0b1d30e83042a 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -6,8 +6,11 @@ use super::ConnectorId; use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ - Message, ChannelAnnotation, BranchMarker, DataMessage, DataHeader, - SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncHeader, + Message, DataHeader, SyncHeader, ChannelAnnotation, BranchMarker, + DataMessage, + SyncCompMessage, SyncCompContent, + SyncPortMessage, SyncPortContent, + SyncControlMessage, SyncControlContent }; use super::scheduler::{ComponentCtx, ComponentPortChange}; @@ -469,6 +472,19 @@ impl Consensus { } } + pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option { + if message.in_response_to_sync_round < self.sync_round { + // Old message + return None + } + + match message.content { + SyncControlContent::ChannelIsClosed(_) => { + return Some(RoundConclusion::Failure); + } + } + } + pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) { debug_assert!(self.branch_can_receive(branch_id, message));