diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 37401255e873261b6b31e02fe0431fbca980a740..d15b6edfdaf3699bb4e01ca26bd67e073551fd28 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -12,7 +12,7 @@ use super::inbox::{ SyncPortMessage, SyncPortContent, SyncControlMessage, SyncControlContent }; -use super::scheduler::{ComponentCtx, ComponentPortChange}; +use super::scheduler::{ComponentCtx, ComponentPortChange, MessageTicket}; struct BranchAnnotation { channel_mapping: Vec, @@ -88,6 +88,13 @@ pub(crate) enum Consistency { Inconsistent, } +#[derive(PartialEq, Eq)] +pub(crate) enum MessageOrigin { + Past, + Present, + Future +} + impl Consensus { pub fn new() -> Self { return Self { @@ -375,20 +382,34 @@ impl Consensus { /// Handles a new data message by handling the sync header. The caller is /// responsible for checking for branches that might be able to receive /// the message. - pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool { - let handled = self.handle_received_sync_header(&message.sync_header, ctx); - if handled { - self.encountered_ports.push(message.data_header.target_port); + pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) -> bool { + let message = ctx.read_message_using_ticket(ticket).as_data(); + let target_port = message.data_header.target_port; + match self.handle_received_sync_header(message.sync_header, ctx) { + MessageOrigin::Past => return false, + MessageOrigin::Present => { + self.encountered_ports.push(target_port); + return true; + }, + MessageOrigin::Future => { + let message = ctx.take_message_using_ticket(ticket); + ctx.put_back_message(message); + return false; + } } - return handled; } /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { - if !self.handle_received_sync_header(&message.sync_header, ctx) { - return None; + match self.handle_received_sync_header(message.sync_header, ctx) { + MessageOrigin::Past => return None, + MessageOrigin::Present => {}, + MessageOrigin::Future => { + ctx.put_back_message(Message::SyncComp(message)); + return None + } } // And handle the contents @@ -427,8 +448,13 @@ impl Consensus { } pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) -> Option { - if !self.handle_received_sync_header(&message.sync_header, ctx) { - return None; + match self.handle_received_sync_header(message.sync_header, ctx) { + MessageOrigin::Past => return None, + MessageOrigin::Present => {}, + MessageOrigin::Future => { + ctx.put_back_message(Message::SyncPort(message)); + return None; + } } debug_assert!(self.is_in_sync()); @@ -482,11 +508,11 @@ impl Consensus { }); } - let maybe_conlusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{ component_id: ctx.id, channels, }), ctx); - return maybe_conlusion; + return maybe_conclusion; } } } @@ -497,6 +523,10 @@ impl Consensus { return None } + // Because the message is always sent in response to a message + // originating here, the sync round number can never be larger than the + // currently stored one. + debug_assert_eq!(message.in_response_to_sync_round, self.sync_round); match message.content { SyncControlContent::ChannelIsClosed(_) => { return self.initiate_sync_failure(ctx); @@ -560,11 +590,12 @@ impl Consensus { // --- Internal helpers - fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool { + fn handle_received_sync_header(&mut self, sync_header: SyncHeader, ctx: &mut ComponentCtx) -> MessageOrigin { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves - if !self.handle_peer(sync_header) { - // We can drop this package - return false; + let origin = self.handle_peer(&sync_header); + if origin != MessageOrigin::Present { + // We do not have to handle it now + return origin; } if sync_header.highest_component_id > self.highest_connector_id { @@ -598,23 +629,35 @@ impl Consensus { ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel } // else: exactly equal, so do nothing - return true; + return MessageOrigin::Present; } /// Handles a (potentially new) peer. Returns `false` if the provided sync /// number is different then the expected one. - fn handle_peer(&mut self, sync_header: &SyncHeader) -> bool { + fn handle_peer(&mut self, sync_header: &SyncHeader) -> MessageOrigin { let position = self.peers.iter().position(|v| v.id == sync_header.sending_component_id); match position { Some(index) => { let entry = &mut self.peers[index]; - entry.encountered_this_round = true; - // TODO: Proper handling of potential overflow - if sync_header.sync_round >= entry.expected_sync_round { - entry.expected_sync_round = sync_header.sync_round; - return true; + if entry.encountered_this_round { + // Already encountered this round + if sync_header.sync_round < entry.expected_sync_round { + return MessageOrigin::Past; + } else if sync_header.sync_round == entry.expected_sync_round { + return MessageOrigin::Present; + } else { + return MessageOrigin::Future; + } } else { - return false; + // TODO: Proper handling of potential overflow + entry.encountered_this_round = true; + + if sync_header.sync_round >= entry.expected_sync_round { + entry.expected_sync_round = sync_header.sync_round; + return MessageOrigin::Present; + } else { + return MessageOrigin::Past; + } } }, None => { @@ -623,7 +666,7 @@ impl Consensus { encountered_this_round: true, expected_sync_round: sync_header.sync_round, }); - return true; + return MessageOrigin::Present; } } }