Changeset - b1299290279a
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-29 18:22:16
contact@maxhenger.nl
put sync rounds in messages to leader, new failing test
4 files changed with 99 insertions and 19 deletions:
0 comments (0 inline, 0 general)
src/runtime2/consensus.rs
Show inline comments
 
use crate::collections::VecSet;
 

	
 
use crate::protocol::eval::ValueGroup;
 

	
 
use super::ConnectorId;
 
use super::branch::BranchId;
 
use super::port::{ChannelId, PortIdLocal, PortState};
 
use super::inbox::{
 
    Message, DataHeader, SyncHeader, ChannelAnnotation, BranchMarker,
 
    DataMessage,
 
    SyncCompMessage, SyncCompContent,
 
    SyncPortMessage, SyncPortContent,
 
    SyncControlMessage, SyncControlContent
 
};
 
use super::scheduler::{ComponentCtx, ComponentPortChange, MessageTicket};
 

	
 
struct BranchAnnotation {
 
    channel_mapping: Vec<ChannelAnnotation>,
 
    cur_marker: BranchMarker,
 
}
 

	
 
#[derive(Debug)]
 
pub(crate) struct LocalSolution {
 
    component: ConnectorId,
 
    final_branch_id: BranchId,
 
    sync_round_number: u32,
 
    port_mapping: Vec<(ChannelId, BranchMarker)>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct GlobalSolution {
 
    component_branches: Vec<(ConnectorId, BranchId)>,
 
    component_branches: Vec<(ConnectorId, BranchId, u32)>,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub enum RoundConclusion {
 
    Failure,
 
    Success(BranchId),
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Consensus
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
struct Peer {
 
    id: ConnectorId,
 
    encountered_this_round: bool,
 
    expected_sync_round: u32,
 
}
 

	
 
/// The consensus algorithm. Currently only implemented to find the component
 
/// with the highest ID within the sync region and letting it handle all the
 
/// local solutions.
 
///
 
/// The type itself serves as an experiment to see how code should be organized.
 
// TODO: Flatten all datastructures
 
// TODO: Have a "branch+port position hint" in case multiple operations are
 
//  performed on the same port to prevent repeated lookups
 
// TODO: A lot of stuff should be batched. Like checking all the sync headers
 
//  and sending "I have a higher ID" messages. Should reduce locking by quite a
 
//  bit.
 
// TODO: Needs a refactor. Firstly we have cases where we don't have a branch ID
 
//  but we do want to enumerate all current ports. So put that somewhere in a
 
//  central place. Secondly. Error handling and regular message handling is
 
//  becoming a mess.
 
pub(crate) struct Consensus {
 
    // --- State that is cleared after each round
 
    // Local component's state
 
    highest_connector_id: ConnectorId,
 
    branch_annotations: Vec<BranchAnnotation>, // index is branch ID
 
    branch_markers: Vec<BranchId>, // index is branch marker, maps to branch
 
    // Gathered state from communication
 
    encountered_ports: VecSet<PortIdLocal>, // to determine if we should send "port remains silent" messages.
 
    solution_combiner: SolutionCombiner,
 
    handled_wave: bool, // encountered notification wave in this round
 
    conclusion: Option<RoundConclusion>,
 
    ack_remaining: u32,
 
    // --- Persistent state
 
    peers: Vec<Peer>,
 
    sync_round: u32,
 
    // --- Workspaces
 
    workspace_ports: Vec<PortIdLocal>,
 
}
 

	
 
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 
pub(crate) enum Consistency {
 
    Valid,
 
    Inconsistent,
 
}
 

	
 
#[derive(Debug, PartialEq, Eq)]
 
pub(crate) enum MessageOrigin {
 
    Past,
 
    Present,
 
    Future
 
}
 

	
 
impl Consensus {
 
    pub fn new() -> Self {
 
        return Self {
 
            highest_connector_id: ConnectorId::new_invalid(),
 
            branch_annotations: Vec::new(),
 
            branch_markers: Vec::new(),
 
            encountered_ports: VecSet::new(),
 
            solution_combiner: SolutionCombiner::new(),
 
            handled_wave: false,
 
            conclusion: None,
 
            ack_remaining: 0,
 
            peers: Vec::new(),
 
            sync_round: 0,
 
            workspace_ports: Vec::new(),
 
        }
 
    }
 

	
 
    // --- Controlling sync round and branches
 

	
 
    /// Returns whether the consensus algorithm is running in sync mode
 
    pub fn is_in_sync(&self) -> bool {
 
        return !self.branch_annotations.is_empty();
 
    }
 

	
 
    /// TODO: Remove this once multi-fire is in place
 
    #[deprecated]
 
    pub fn get_annotation(&self, branch_id: BranchId, channel_id: PortIdLocal) -> &ChannelAnnotation {
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 
        let port = branch.channel_mapping.iter().find(|v| v.channel_id.index == channel_id.index).unwrap();
 
        return port;
 
    }
 

	
 
    /// Sets up the consensus algorithm for a new synchronous round. The
 
    /// provided ports should be the ports the component owns at the start of
 
    /// the sync round.
 
    pub fn start_sync(&mut self, ctx: &ComponentCtx) {
 
        debug_assert!(!self.highest_connector_id.is_valid());
 
        debug_assert!(self.branch_annotations.is_empty());
 
        debug_assert!(self.solution_combiner.local.is_empty());
 

	
 
        // We'll use the first "branch" (the non-sync one) to store our ports,
 
        // this allows cloning if we created a new branch.
 
        self.branch_annotations.push(BranchAnnotation{
 
@@ -197,332 +199,335 @@ impl Consensus {
 
            match mapping.expected_firing {
 
                Some(expected) => {
 
                    if expected != mapping.registered_id.is_some() {
 
                        // Inconsistent speculative state and actual state
 
                        debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier
 
                        return Consistency::Inconsistent;
 
                    }
 
                },
 
                None => {},
 
            }
 
        }
 

	
 
        return Consistency::Valid;
 
    }
 

	
 
    /// Notifies the consensus algorithm that a particular branch has assumed
 
    /// a speculative value for its port mapping.
 
    pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency {
 
        debug_assert!(self.is_in_sync());
 

	
 
        let port_desc = ctx.get_port_by_id(port_id).unwrap();
 
        let channel_id = port_desc.channel_id;
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == channel_id {
 
                match mapping.expected_firing {
 
                    None => {
 
                        // Not yet mapped, perform speculative mapping
 
                        mapping.expected_firing = Some(does_fire);
 
                        return Consistency::Valid;
 
                    },
 
                    Some(current) => {
 
                        // Already mapped
 
                        if current == does_fire {
 
                            return Consistency::Valid;
 
                        } else {
 
                            return Consistency::Inconsistent;
 
                        }
 
                    }
 
                }
 
            }
 
        }
 

	
 
        unreachable!("notify_of_speculative_mapping called with unowned port");
 
    }
 

	
 
    /// Generates a new local solution from a finished branch. If the component
 
    /// is not the leader of the sync region then it will be sent to the
 
    /// appropriate component. If it is the leader then there is a chance that
 
    /// this solution completes a global solution. In that case the solution
 
    /// branch ID will be returned.
 
    pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        // Turn the port mapping into a local solution
 
        let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping;
 
        let mut target_mapping = Vec::with_capacity(source_mapping.len());
 

	
 
        for port in source_mapping {
 
            // Note: if the port is silent, and we've never communicated
 
            // over the port, then we need to do so now, to let the peer
 
            // component know about our sync leader state.
 
            let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap();
 
            let self_port_id = port_desc.self_id;
 
            let peer_port_id = port_desc.peer_id;
 
            let channel_id = port_desc.channel_id;
 

	
 
            if !self.encountered_ports.contains(&self_port_id) {
 
                let message = SyncPortMessage {
 
                    sync_header: SyncHeader{
 
                        sending_component_id: ctx.id,
 
                        highest_component_id: self.highest_connector_id,
 
                        sync_round: self.sync_round
 
                    },
 
                    source_port: self_port_id,
 
                    target_port: peer_port_id,
 
                    content: SyncPortContent::SilentPortNotification,
 
                };
 
                match ctx.submit_message(Message::SyncPort(message)) {
 
                    Ok(_) => {
 
                        self.encountered_ports.push(self_port_id);
 
                    },
 
                    Err(_) => {
 
                        // Seems like we were done with this branch, but one of
 
                        // the silent ports (in scope) is actually closed
 
                        return self.notify_of_fatal_branch(branch_id, ctx);
 
                    }
 
                }
 
            }
 

	
 
            target_mapping.push((
 
                channel_id,
 
                port.registered_id.unwrap_or(BranchMarker::new_invalid())
 
            ));
 
        }
 

	
 
        let local_solution = LocalSolution{
 
            component: ctx.id,
 
            sync_round_number: self.sync_round,
 
            final_branch_id: branch_id,
 
            port_mapping: target_mapping,
 
        };
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx);
 
        return maybe_conclusion;
 
    }
 

	
 
    /// Notifies the consensus algorithm about the chosen branch to commit to
 
    /// memory (may be the invalid "start" branch)
 
    pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec<ComponentPortChange>) {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // TODO: Handle sending and receiving ports
 
        // Set final ports
 
        let branch = &self.branch_annotations[branch_id.index as usize];
 

	
 
        // Clear out internal storage to defaults
 
        println!("DEBUG: ***** Incrementing sync round stuff");
 
        self.highest_connector_id = ConnectorId::new_invalid();
 
        self.branch_annotations.clear();
 
        self.branch_markers.clear();
 
        self.encountered_ports.clear();
 
        self.solution_combiner.clear();
 
        self.handled_wave = false;
 
        self.conclusion = None;
 
        self.ack_remaining = 0;
 

	
 
        // And modify persistent storage
 
        self.sync_round += 1;
 

	
 
        for peer in self.peers.iter_mut() {
 
            peer.encountered_this_round = false;
 
            peer.expected_sync_round += 1;
 
        }
 

	
 
        println!("DEBUG: ***** Peers post round are:\n{:#?}", &self.peers)
 
    }
 

	
 
    // --- Handling messages
 

	
 
    /// Prepares a message for sending. Caller should have made sure that
 
    /// sending the message is consistent with the speculative state.
 
    pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) {
 
        debug_assert!(self.is_in_sync());
 
        let branch = &mut self.branch_annotations[branch_id.index as usize];
 
        let port_info = ctx.get_port_by_id(source_port_id).unwrap();
 

	
 
        if cfg!(debug_assertions) {
 
            // Check for consistent mapping
 
            let port = branch.channel_mapping.iter()
 
                .find(|v| v.channel_id == port_info.channel_id)
 
                .unwrap();
 
            debug_assert!(port.expected_firing == None || port.expected_firing == Some(true));
 
        }
 

	
 
        // Check for ports that are being sent
 
        debug_assert!(self.workspace_ports.is_empty());
 
        find_ports_in_value_group(content, &mut self.workspace_ports);
 
        if !self.workspace_ports.is_empty() {
 
            todo!("handle sending ports");
 
            self.workspace_ports.clear();
 
        }
 

	
 
        // Construct data header
 
        let data_header = DataHeader{
 
            expected_mapping: branch.channel_mapping.iter()
 
                .filter(|v| v.registered_id.is_some() || v.channel_id == port_info.channel_id)
 
                .copied()
 
                .collect(),
 
            sending_port: port_info.self_id,
 
            target_port: port_info.peer_id,
 
            new_mapping: branch.cur_marker,
 
        };
 

	
 
        // Update port mapping
 
        for mapping in &mut branch.channel_mapping {
 
            if mapping.channel_id == port_info.channel_id {
 
                mapping.expected_firing = Some(true);
 
                mapping.registered_id = Some(branch.cur_marker);
 
            }
 
        }
 

	
 
        // Update branch marker
 
        let new_marker = BranchMarker::new(self.branch_markers.len() as u32);
 
        branch.cur_marker = new_marker;
 
        self.branch_markers.push(branch_id);
 

	
 
        self.encountered_ports.push(source_port_id);
 

	
 
        return (self.create_sync_header(ctx), data_header);
 
    }
 

	
 
    /// Handles a new data message by handling the sync header. The caller is
 
    /// responsible for checking for branches that might be able to receive
 
    /// the message.
 
    pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) -> bool {
 
        let message = ctx.read_message_using_ticket(ticket).as_data();
 
        let target_port = message.data_header.target_port;
 
        match self.handle_received_sync_header(message.sync_header, ctx) {
 
            MessageOrigin::Past => return false,
 
            MessageOrigin::Present => {
 
                self.encountered_ports.push(target_port);
 
                return true;
 
            },
 
            MessageOrigin::Future => {
 
                let message = ctx.take_message_using_ticket(ticket);
 
                ctx.put_back_message(message);
 
                return false;
 
            }
 
        }
 
    }
 

	
 
    /// Handles a new sync message by handling the sync header and the contents
 
    /// of the message. Returns `Some` with the branch ID of the global solution
 
    /// if the sync solution has been found.
 
    pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        match self.handle_received_sync_header(message.sync_header, ctx) {
 
            MessageOrigin::Past => return None,
 
            MessageOrigin::Present => {},
 
            MessageOrigin::Future => {
 
                ctx.put_back_message(Message::SyncComp(message));
 
                return None
 
            }
 
        }
 

	
 
        // And handle the contents
 
        debug_assert_eq!(message.target_component_id, ctx.id);
 

	
 
        match &message.content {
 
            SyncCompContent::LocalFailure |
 
            SyncCompContent::LocalSolution(_) |
 
            SyncCompContent::PartialSolution(_) |
 
            SyncCompContent::AckFailure |
 
            SyncCompContent::Presence(_) => {
 
                // Needs to be handled by the leader
 
                return self.send_to_leader_or_handle_as_leader(message.content, ctx);
 
            },
 
            SyncCompContent::GlobalSolution(solution) => {
 
                // Found a global solution
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let (_, branch_id) = solution.component_branches.iter()
 
                    .find(|(component_id, _)| *component_id == ctx.id)
 
                let (_, branch_id, _) = solution.component_branches.iter()
 
                    .find(|(component_id, _, _)| *component_id == ctx.id)
 
                    .unwrap();
 
                return Some(RoundConclusion::Success(*branch_id));
 
            },
 
            SyncCompContent::GlobalFailure => {
 
                // Global failure of round, send Ack to leader
 
                println!("DEBUGERINO: Got GlobalFailure, sending Ack in response");
 
                debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader
 
                let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx);
 
                debug_assert!(_result.is_none());
 
                return Some(RoundConclusion::Failure);
 
            },
 
            SyncCompContent::Notification => {
 
                // We were just interested in the sync header we handled above
 
                return None;
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        match self.handle_received_sync_header(message.sync_header, ctx) {
 
            MessageOrigin::Past => return None,
 
            MessageOrigin::Present => {},
 
            MessageOrigin::Future => {
 
                ctx.put_back_message(Message::SyncPort(message));
 
                return None;
 
            }
 
        }
 

	
 
        debug_assert!(self.is_in_sync());
 
        debug_assert!(ctx.get_port_by_id(message.target_port).is_some());
 
        match message.content {
 
            SyncPortContent::SilentPortNotification => {
 
                // The point here is to let us become part of the sync round and
 
                // take note of the leader in case all of our ports are silent.
 
                self.encountered_ports.push(message.target_port);
 
                return None
 
            }
 
            SyncPortContent::NotificationWave => {
 
                // Wave to discover everyone in the network, handling sync
 
                // header takes care of leader discovery, here we need to make
 
                // sure we propagate the wave
 
                if self.handled_wave {
 
                    return None;
 
                }
 

	
 
                self.handled_wave = true;
 

	
 
                // Propagate wave to all peers except the one that has sent us
 
                // the wave.
 
                for mapping in &self.branch_annotations[0].channel_mapping {
 
                    let channel_id = mapping.channel_id;
 
                    let port_desc = ctx.get_port_by_channel_id(channel_id).unwrap();
 
                    if port_desc.self_id == message.target_port {
 
                        // Wave came from this port, no need to send one back
 
                        continue;
 
                    }
 

	
 
                    let message = SyncPortMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        source_port: port_desc.self_id,
 
                        target_port: port_desc.peer_id,
 
                        content: SyncPortContent::NotificationWave,
 
                    };
 
                    // As with the other SyncPort where we throw away the
 
                    // result: we're dealing with an error here anyway
 
                    let _unused = ctx.submit_message(Message::SyncPort(message));
 
                }
 

	
 
                // And let the leader know about our port state
 
                let annotations = &self.branch_annotations[0];
 
                let mut channels = Vec::with_capacity(annotations.channel_mapping.len());
 
                for mapping in &annotations.channel_mapping {
 
                    let port_info = ctx.get_port_by_channel_id(mapping.channel_id).unwrap();
 
                    channels.push(LocalChannelPresence{
 
                        channel_id: mapping.channel_id,
 
                        is_closed: port_info.state == PortState::Closed,
 
                    });
 
                }
 

	
 
                let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{
 
                    component_id: ctx.id,
 
                    channels,
 
                }), ctx);
 
                return maybe_conclusion;
 
            }
 
        }
 
    }
 

	
 
    pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if message.in_response_to_sync_round < self.sync_round {
 
            // Old message
 
            return None
 
        }
 

	
 
        // Because the message is always sent in response to a message
 
        // originating here, the sync round number can never be larger than the
 
@@ -646,455 +651,476 @@ impl Consensus {
 
                    if sync_header.sync_round < entry.expected_sync_round {
 
                        return MessageOrigin::Past;
 
                    } else if sync_header.sync_round == entry.expected_sync_round {
 
                        return MessageOrigin::Present;
 
                    } else {
 
                        return MessageOrigin::Future;
 
                    }
 
                } else {
 
                    // TODO: Proper handling of potential overflow
 
                    entry.encountered_this_round = true;
 

	
 
                    if sync_header.sync_round >= entry.expected_sync_round {
 
                        entry.expected_sync_round = sync_header.sync_round;
 
                        return MessageOrigin::Present;
 
                    } else {
 
                        return MessageOrigin::Past;
 
                    }
 
                }
 
            },
 
            None => {
 
                self.peers.push(Peer{
 
                    id: sync_header.sending_component_id,
 
                    encountered_this_round: true,
 
                    expected_sync_round: sync_header.sync_round,
 
                });
 
                return MessageOrigin::Present;
 
            }
 
        }
 
    }
 

	
 
    /// Sends a message towards the leader, if already the leader then the
 
    /// message will be handled immediately.
 
    fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.highest_connector_id == ctx.id {
 
            // We are the leader
 
            match content {
 
                SyncCompContent::LocalFailure => {
 
                    if self.solution_combiner.mark_failure_and_check_for_global_failure() {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::LocalSolution(local_solution) => {
 
                    if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) {
 
                        return self.handle_global_solution_as_leader(global_solution, ctx);
 
                    }
 
                },
 
                SyncCompContent::PartialSolution(partial_solution) => {
 
                    if let Some(conclusion) = self.solution_combiner.combine(partial_solution) {
 
                        match conclusion {
 
                            LeaderConclusion::Solution(global_solution) => {
 
                                return self.handle_global_solution_as_leader(global_solution, ctx);
 
                            },
 
                            LeaderConclusion::Failure => {
 
                                return self.handle_global_failure_as_leader(ctx);
 
                            }
 
                        }
 
                    }
 
                },
 
                SyncCompContent::Presence(component_presence) => {
 
                    if self.solution_combiner.add_presence_and_check_for_global_failure(component_presence.component_id, &component_presence.channels) {
 
                        return self.handle_global_failure_as_leader(ctx);
 
                    }
 
                },
 
                SyncCompContent::AckFailure => {
 
                    debug_assert_eq!(Some(RoundConclusion::Failure), self.conclusion);
 
                    debug_assert!(self.ack_remaining > 0);
 
                    self.ack_remaining -= 1;
 
                    if self.ack_remaining == 0 {
 
                        return Some(RoundConclusion::Failure);
 
                    }
 
                }
 
                SyncCompContent::Notification | SyncCompContent::GlobalSolution(_) |
 
                SyncCompContent::GlobalFailure => {
 
                    unreachable!("unexpected message content for leader");
 
                },
 
            }
 
        } else {
 
            // Someone else is the leader
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content,
 
            };
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        if self.conclusion.is_some() {
 
            return None;
 
        }
 

	
 
        // Handle the global solution
 
        let mut my_final_branch_id = BranchId::new_invalid();
 
        for (connector_id, branch_id) in global_solution.component_branches.iter().copied() {
 
        for (connector_id, branch_id, sync_round) in global_solution.component_branches.iter().copied() {
 
            if connector_id == ctx.id {
 
                // This is our solution branch
 
                my_final_branch_id = branch_id;
 
                continue;
 
            }
 

	
 
            // Send solution message
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: connector_id,
 
                content: SyncCompContent::GlobalSolution(global_solution.clone()),
 
            };
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 

	
 
            // Update peers as leader. Subsequent call to `end_sync` will update
 
            // the round numbers
 
            match self.peers.iter_mut().find(|v| v.id == connector_id) {
 
                Some(peer) => {
 
                    peer.expected_sync_round = sync_round;
 
                },
 
                None => {
 
                    self.peers.push(Peer{
 
                        id: connector_id,
 
                        expected_sync_round: sync_round,
 
                        encountered_this_round: true,
 
                    });
 
                }
 
            }
 
        }
 

	
 
        debug_assert!(my_final_branch_id.is_valid());
 
        self.conclusion = Some(RoundConclusion::Success(my_final_branch_id));
 
        return Some(RoundConclusion::Success(my_final_branch_id));
 
    }
 

	
 
    fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure());
 
        if self.conclusion.is_some() {
 
            // Already sent out a failure
 
            return None;
 
        }
 

	
 
        // TODO: Performance
 
        let mut encountered = VecSet::new();
 
        for presence in &self.solution_combiner.presence {
 
            if presence.owner_a != ctx.id {
 
                // Did not add it ourselves
 
                if encountered.push(presence.owner_a) {
 
                    // Not yet sent a message
 
                    let message = SyncCompMessage{
 
                        sync_header: self.create_sync_header(ctx),
 
                        target_component_id: presence.owner_a,
 
                        content: SyncCompContent::GlobalFailure,
 
                    };
 
                    ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
                }
 
            } else if let Some(owner_b) = presence.owner_b {
 
            }
 

	
 
            if let Some(owner_b) = presence.owner_b {
 
                if owner_b != ctx.id {
 
                    if encountered.push(owner_b) {
 
                        let message = SyncCompMessage{
 
                            sync_header: self.create_sync_header(ctx),
 
                            target_component_id: owner_b,
 
                            content: SyncCompContent::GlobalFailure,
 
                        };
 
                        ctx.submit_message(Message::SyncComp(message)).unwrap();
 
                    }
 
                }
 
            }
 
        }
 

	
 
        println!("DEBUGERINO: Leader entering error state, we need to wait on {:?}", encountered.iter().map(|v| v.index).collect::<Vec<_>>());
 
        self.conclusion = Some(RoundConclusion::Failure);
 
        if encountered.is_empty() {
 
            // We don't have to wait on Acks
 
            return Some(RoundConclusion::Failure);
 
        } else {
 
            self.ack_remaining = encountered.len() as u32;
 
            return None;
 
        }
 
    }
 

	
 
    fn initiate_sync_failure(&mut self, ctx: &mut ComponentCtx) -> Option<RoundConclusion> {
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Notify leader of our channels and the fact that we just failed
 
        let channel_mapping = &self.branch_annotations[0].channel_mapping;
 
        let mut channel_presence = Vec::with_capacity(channel_mapping.len());
 
        for mapping in channel_mapping {
 
            let port = ctx.get_port_by_channel_id(mapping.channel_id).unwrap();
 
            channel_presence.push(LocalChannelPresence{
 
                channel_id: mapping.channel_id,
 
                is_closed: port.state == PortState::Closed,
 
            });
 
        }
 
        let maybe_already = self.send_to_leader_or_handle_as_leader(SyncCompContent::Presence(ComponentPresence{
 
            component_id: ctx.id,
 
            channels: channel_presence,
 
        }), ctx);
 

	
 
        if self.handled_wave {
 
            // Someone (or us) has already initiated a sync failure.
 
            return maybe_already;
 
        }
 

	
 
        let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx);
 
        debug_assert!(if maybe_already.is_some() { maybe_conclusion.is_some() } else { true });
 
        println!("DEBUG: Maybe conclusion is {:?}", maybe_conclusion);
 

	
 
        // Initiate a discovery wave so peers can do the same
 
        self.handled_wave = true;
 
        for mapping in &self.branch_annotations[0].channel_mapping {
 
            let channel_id = mapping.channel_id;
 
            let port_info = ctx.get_port_by_channel_id(channel_id).unwrap();
 
            let message = SyncPortMessage{
 
                sync_header: self.create_sync_header(ctx),
 
                source_port: port_info.self_id,
 
                target_port: port_info.peer_id,
 
                content: SyncPortContent::NotificationWave,
 
            };
 

	
 
            // Note: submitting the message might fail. But we're attempting to
 
            // handle the error anyway.
 
            // TODO: Think about this a second time: how do we make sure the
 
            //  entire network will fail if we reach this condition
 
            let _unused = ctx.submit_message(Message::SyncPort(message));
 
        }
 

	
 
        return maybe_conclusion;
 
    }
 

	
 
    #[inline]
 
    fn create_sync_header(&self, ctx: &ComponentCtx) -> SyncHeader {
 
        return SyncHeader{
 
            sending_component_id: ctx.id,
 
            highest_component_id: self.highest_connector_id,
 
            sync_round: self.sync_round,
 
        }
 
    }
 

	
 
    fn forward_local_data_to_new_leader(&mut self, ctx: &mut ComponentCtx) {
 
        debug_assert_ne!(self.highest_connector_id, ctx.id);
 

	
 
        if let Some(partial_solution) = self.solution_combiner.drain() {
 
            let message = SyncCompMessage {
 
                sync_header: self.create_sync_header(ctx),
 
                target_component_id: self.highest_connector_id,
 
                content: SyncCompContent::PartialSolution(partial_solution),
 
            };
 
            ctx.submit_message(Message::SyncComp(message)).unwrap(); // unwrap: sending to component instead of through channel
 
        }
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Solution storage and algorithms
 
// -----------------------------------------------------------------------------
 

	
 
// TODO: Remove all debug derives
 

	
 
#[derive(Debug, Clone)]
 
struct MatchedLocalSolution {
 
    final_branch_id: BranchId,
 
    channel_mapping: Vec<(ChannelId, BranchMarker)>,
 
    matches: Vec<ComponentMatches>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentMatches {
 
    target_id: ConnectorId,
 
    target_index: usize,
 
    match_indices: Vec<usize>, // of local solution in connector
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentPeer {
 
    target_id: ConnectorId,
 
    target_index: usize, // in array of global solution components
 
    involved_channels: Vec<ChannelId>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
struct ComponentLocalSolutions {
 
    component: ConnectorId,
 
    sync_round: u32,
 
    peers: Vec<ComponentPeer>,
 
    solutions: Vec<MatchedLocalSolution>,
 
    all_peers_present: bool,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct ComponentPresence {
 
    component_id: ConnectorId,
 
    channels: Vec<LocalChannelPresence>,
 
}
 

	
 
#[derive(Debug, Clone)]
 
pub(crate) struct LocalChannelPresence {
 
    channel_id: ChannelId,
 
    is_closed: bool,
 
}
 

	
 
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 
enum PresenceState {
 
    OnePresent, // one component reported the channel being open
 
    BothPresent, // two components reported the channel being open
 
    Closed, // one component reported the channel being closed
 
}
 

	
 
/// Record to hold channel state during the error-resolving mode of the leader.
 
/// This is used to determine when the sync region has grown to its largest
 
/// size. The structure is eventually consistent in the sense that a component
 
/// might initially presume a channel is open, only to figure out later it is
 
/// actually closed.
 
#[derive(Debug, Clone)]
 
struct ChannelPresence {
 
    owner_a: ConnectorId,
 
    owner_b: Option<ConnectorId>,
 
    id: ChannelId,
 
    state: PresenceState,
 
}
 

	
 
// TODO: Flatten? Flatten. Flatten everything.
 
#[derive(Debug)]
 
pub(crate) struct SolutionCombiner {
 
    local: Vec<ComponentLocalSolutions>, // used for finding solution
 
    presence: Vec<ChannelPresence>, // used to detect all channels present in case of failure
 
    failure_reported: bool,
 
}
 

	
 
struct CheckEntry {
 
    component_index: usize,         // component index in combiner's vector
 
    solution_index: usize,          // solution entry in the above component entry
 
    parent_entry_index: usize,      // parent that caused the creation of this checking entry
 
    match_index_in_parent: usize,   // index in the matches array of the parent
 
    solution_index_in_parent: usize,// index in the solution array of the match entry in the parent
 
}
 

	
 
enum LeaderConclusion {
 
    Solution(GlobalSolution),
 
    Failure,
 
}
 

	
 
impl SolutionCombiner {
 
    fn new() -> Self {
 
        return Self{
 
            local: Vec::new(),
 
            presence: Vec::new(),
 
            failure_reported: false,
 
        };
 
    }
 

	
 
    /// Adds a new local solution to the global solution storage. Will check the
 
    /// new local solutions for matching against already stored local solutions
 
    /// of peer connectors.
 
    fn add_solution_and_check_for_global_solution(&mut self, solution: LocalSolution) -> Option<GlobalSolution> {
 
        let component_id = solution.component;
 
        let sync_round = solution.sync_round_number;
 
        let solution = MatchedLocalSolution{
 
            final_branch_id: solution.final_branch_id,
 
            channel_mapping: solution.port_mapping,
 
            matches: Vec::new(),
 
        };
 

	
 
        // Create an entry for the solution for the particular component
 
        let component_exists = self.local.iter_mut()
 
            .enumerate()
 
            .find(|(_, v)| v.component == component_id);
 
        let (component_index, solution_index, new_component) = match component_exists {
 
            Some((component_index, storage)) => {
 
                // Entry for component exists, so add to solutions
 
                let solution_index = storage.solutions.len();
 
                storage.solutions.push(solution);
 

	
 
                (component_index, solution_index, false)
 
            }
 
            None => {
 
                // Entry for component does not exist yet
 
                let component_index = self.local.len();
 
                self.local.push(ComponentLocalSolutions{
 
                    component: component_id,
 
                    sync_round,
 
                    peers: Vec::new(),
 
                    solutions: vec![solution],
 
                    all_peers_present: false,
 
                });
 
                (component_index, 0, true)
 
            }
 
        };
 

	
 
        // If this is a solution of a component that is new to us, then we check
 
        // in the stored solutions which other components are peers of the new
 
        // one.
 
        if new_component {
 
            let cur_ports = &self.local[component_index].solutions[0].channel_mapping;
 
            let mut component_peers = Vec::new();
 

	
 
            // Find the matching components
 
            for (other_index, other_component) in self.local.iter().enumerate() {
 
                if other_index == component_index {
 
                    // Don't match against ourselves
 
                    continue;
 
                }
 

	
 
                let mut matching_channels = Vec::new();
 
                for (cur_channel_id, _) in cur_ports {
 
                    for (other_channel_id, _) in &other_component.solutions[0].channel_mapping {
 
                        if cur_channel_id == other_channel_id {
 
                            // We have a shared port
 
                            matching_channels.push(*cur_channel_id);
 
                        }
 
                    }
 
                }
 

	
 
                if !matching_channels.is_empty() {
 
                    // We share some ports
 
                    component_peers.push(ComponentPeer{
 
                        target_id: other_component.component,
 
                        target_index: other_index,
 
                        involved_channels: matching_channels,
 
                    });
 
                }
 
            }
 

	
 
            let mut num_ports_in_peers = 0;
 
            for peer in &component_peers {
 
                num_ports_in_peers += peer.involved_channels.len();
 
            }
 

	
 
            if num_ports_in_peers == cur_ports.len() {
 
                // Newly added component has all required peers present
 
                self.local[component_index].all_peers_present = true;
 
            }
 

	
 
            // Add the found component pairing entries to the solution entries
 
            // for the two involved components
 
            for component_match in component_peers {
 
                // Check the other component for having all peers present
 
                let mut num_ports_in_peers = component_match.involved_channels.len();
 
                let other_component = &mut self.local[component_match.target_index];
 
                for existing_peer in &other_component.peers {
 
                    num_ports_in_peers += existing_peer.involved_channels.len();
 
                }
 

	
 
                if num_ports_in_peers == other_component.solutions[0].channel_mapping.len() {
 
                    other_component.all_peers_present = true;
 
                }
 

	
 
                other_component.peers.push(ComponentPeer{
 
                    target_id: component_id,
 
                    target_index: component_index,
 
                    involved_channels: component_match.involved_channels.clone(),
 
                });
 

	
 
                let new_component = &mut self.local[component_index];
 
                new_component.peers.push(component_match);
 
            }
 
        }
 

	
 
        // We're now sure that we know which other components the currently
 
        // considered component is linked up to. Now we need to check those
 
        // entries (if any) to see if any pair of local solutions match
 
        let mut new_component_matches = Vec::new();
 
        let cur_component = &self.local[component_index];
 
        let cur_solution = &cur_component.solutions[solution_index];
 

	
 
        for peer in &cur_component.peers {
 
            let mut new_solution_matches = Vec::new();
 

	
 
            let other_component = &self.local[peer.target_index];
 
            for (other_solution_index, other_solution) in other_component.solutions.iter().enumerate() {
 
                // Check the port mappings between the pair of solutions.
 
                let mut all_matched = true;
 

	
 
                'mapping_check_loop: for (cur_port, cur_branch) in &cur_solution.channel_mapping {
 
                    for (other_port, other_branch) in &other_solution.channel_mapping {
 
                        if cur_port == other_port {
 
                            if cur_branch == other_branch {
 
@@ -1256,285 +1282,286 @@ impl SolutionCombiner {
 
                        // entry, so see if we have a composable pair of
 
                        // solutions.
 
                        if !prev_matching_component.match_indices.contains(&cur_entry.solution_index) {
 
                            all_match = false;
 
                            break 'check_against_existing;
 
                        }
 
                    }
 
                }
 
            }
 

	
 
            if all_match {
 
                // All components matched until now.
 
                if stack.len() == self.local.len() {
 
                    // We have found a global solution
 
                    break 'check_last_stack;
 
                }
 

	
 
                // Not all components found yet, look for a new one that has not
 
                // yet been added yet.
 
                for (parent_index, parent_entry) in stack.iter().enumerate() {
 
                    let parent_component = &self.local[parent_entry.component_index];
 
                    let parent_solution = &parent_component.solutions[parent_entry.solution_index];
 

	
 
                    for (peer_index, peer_component) in parent_solution.matches.iter().enumerate() {
 
                        if peer_component.match_indices.is_empty() {
 
                            continue;
 
                        }
 

	
 
                        let already_added = stack.iter().any(|v| v.component_index == peer_component.target_index);
 
                        if !already_added {
 
                            // New component to try
 
                            stack.push(CheckEntry{
 
                                component_index: peer_component.target_index,
 
                                solution_index: peer_component.match_indices[0],
 
                                parent_entry_index: parent_index,
 
                                match_index_in_parent: peer_index,
 
                                solution_index_in_parent: 0,
 
                            });
 
                            continue 'check_last_stack;
 
                        }
 
                    }
 
                }
 

	
 
                // Cannot find a peer to add. This is possible if, for example,
 
                // we have a component A which has the only connection to
 
                // component B. And B has sent a local solution saying it is
 
                // finished, but the last data message has not yet arrived at A.
 

	
 
                // In any case, we just exit the if statement and handle not
 
                // being able to find a new connector as being forced to try a
 
                // new permutation of possible local solutions.
 
            }
 

	
 
            // Either the currently considered local solution is inconsistent
 
            // with other local solutions, or we cannot find a new component to
 
            // add. This is where we perform backtracking as long as needed to
 
            // try a new solution.
 
            while stack.len() > 1 {
 
                // Check if our parent has another solution we can try
 
                let cur_index = stack.len() - 1;
 
                let cur_entry = &stack[cur_index];
 

	
 
                let parent_entry = &stack[cur_entry.parent_entry_index];
 
                let parent_component = &self.local[parent_entry.component_index];
 
                let parent_solution = &parent_component.solutions[parent_entry.solution_index];
 

	
 
                let match_component = &parent_solution.matches[cur_entry.match_index_in_parent];
 
                debug_assert!(match_component.target_index == cur_entry.component_index);
 
                let new_solution_index_in_parent = cur_entry.solution_index_in_parent + 1;
 

	
 
                if new_solution_index_in_parent < match_component.match_indices.len() {
 
                    // We can still try a new one
 
                    let new_solution_index = match_component.match_indices[new_solution_index_in_parent];
 
                    let cur_entry = &mut stack[cur_index];
 
                    cur_entry.solution_index_in_parent = new_solution_index_in_parent;
 
                    cur_entry.solution_index = new_solution_index;
 
                    continue 'check_last_stack;
 
                } else {
 
                    // We're out of options here. So pop an entry, then in
 
                    // the next iteration of this backtracking loop we try
 
                    // to increment that solution
 
                    stack.pop();
 
                }
 
            }
 

	
 
            // Stack length is 1, hence we're back at our initial solution.
 
            // Since that doesn't yield a global solution, we simply:
 
            return None;
 
        }
 

	
 
        // Constructing the representation of the global solution
 
        debug_assert_eq!(stack.len(), self.local.len());
 
        let mut final_branches = Vec::with_capacity(stack.len());
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            let solution = &component.solutions[entry.solution_index];
 
            final_branches.push((component.component, solution.final_branch_id));
 
            final_branches.push((component.component, solution.final_branch_id, component.sync_round));
 
        }
 

	
 
        // Just debugging here, TODO: @remove
 
        let mut total_num_channels = 0;
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            total_num_channels += component.solutions[0].channel_mapping.len();
 
        }
 

	
 
        total_num_channels /= 2;
 
        let mut final_mapping = Vec::with_capacity(total_num_channels);
 
        let mut total_num_checked = 0;
 

	
 
        for entry in &stack {
 
            let component = &self.local[entry.component_index];
 
            let solution = &component.solutions[entry.solution_index];
 

	
 
            for (channel_id, branch_id) in solution.channel_mapping.iter().copied() {
 
                match final_mapping.iter().find(|(v, _)| *v == channel_id) {
 
                    Some((_, encountered_branch_id)) => {
 
                        debug_assert_eq!(*encountered_branch_id, branch_id);
 
                        total_num_checked += 1;
 
                    },
 
                    None => {
 
                        final_mapping.push((channel_id, branch_id));
 
                    }
 
                }
 
            }
 
        }
 

	
 
        debug_assert_eq!(total_num_checked, total_num_channels);
 

	
 
        return Some(GlobalSolution{
 
            component_branches: final_branches,
 
            channel_mapping: final_mapping,
 
        });
 
    }
 

	
 
    /// Checks if all preconditions for global sync failure have been met
 
    fn check_for_global_failure(&self) -> bool {
 
        if !self.failure_reported {
 
            return false;
 
        }
 

	
 
        // Failure is reported, if all components are present then we may emit
 
        // the global failure broadcast
 
        // Check if all are present and we're preparing to fail this round
 
        let mut all_present = true;
 
        for presence in &self.presence {
 
            if presence.state == PresenceState::OnePresent {
 
                all_present = false;
 
                break;
 
            }
 
        }
 

	
 
        return all_present; // && failure_reported, which is checked above
 
    }
 

	
 
    /// Turns the entire (partially resolved) global solution into a structure
 
    /// that can be forwarded to a new parent. The new parent may then merge
 
    /// already obtained information.
 
    fn drain(&mut self) -> Option<SolutionCombiner> {
 
        if self.local.is_empty() && self.presence.is_empty() && !self.failure_reported {
 
            return None;
 
        }
 

	
 
        let result = SolutionCombiner{
 
            local: self.local.clone(),
 
            presence: self.presence.clone(),
 
            failure_reported: self.failure_reported,
 
        };
 

	
 
        self.local.clear();
 
        self.presence.clear();
 
        self.failure_reported = false;
 
        return Some(result);
 
    }
 

	
 
    // TODO: Entire routine is quite wasteful. Combine instead of doing all work
 
    //  again.
 
    fn combine(&mut self, combiner: SolutionCombiner) -> Option<LeaderConclusion> {
 
        self.failure_reported = self.failure_reported || combiner.failure_reported;
 

	
 
        // Handle local solutions
 
        if self.local.is_empty() {
 
            // Trivial case
 
            self.local = combiner.local;
 
        } else {
 
            for local in combiner.local {
 
                for matched in local.solutions {
 
                    let local_solution = LocalSolution{
 
                        component: local.component,
 
                        sync_round_number: local.sync_round,
 
                        final_branch_id: matched.final_branch_id,
 
                        port_mapping: matched.channel_mapping,
 
                    };
 
                    let maybe_solution = self.add_solution_and_check_for_global_solution(local_solution);
 
                    if let Some(global_solution) = maybe_solution {
 
                        return Some(LeaderConclusion::Solution(global_solution));
 
                    }
 
                }
 
            }
 
        }
 

	
 
        // Handle channel presence
 
        println!("DEBUGERINO: Presence before joining is {:#?}", &self.presence);
 
        if self.presence.is_empty() {
 
            // Trivial case
 
            self.presence = combiner.presence;
 
            println!("DEBUGERINO: Trivial merging")
 
        } else {
 
            for presence in combiner.presence {
 
                match self.presence.iter_mut().find(|v| v.id == presence.id) {
 
                    Some(entry) => {
 
                        // Combine entries. Take first that has Closed, then
 
                        // check first that has both, then check if they are
 
                        // combinable
 
                        if entry.state == PresenceState::Closed {
 
                            // Do nothing
 
                        } else if presence.state == PresenceState::Closed {
 
                            entry.owner_a = presence.owner_a;
 
                            entry.owner_b = presence.owner_b;
 
                            entry.state = PresenceState::Closed;
 
                        } else if entry.state == PresenceState::BothPresent {
 
                            // Again: do nothing
 
                        } else if presence.state == PresenceState::BothPresent {
 
                            entry.owner_a = presence.owner_a;
 
                            entry.owner_b = presence.owner_b;
 
                            entry.state = PresenceState::BothPresent;
 
                        } else {
 
                            // Both have one presence, combine into both present
 
                            debug_assert!(entry.state == PresenceState::OnePresent && presence.state == PresenceState::OnePresent);
 
                            entry.owner_b = Some(presence.owner_a);
 
                            entry.state = PresenceState::BothPresent;
 
                        }
 
                    },
 
                    None => {
 
                        self.presence.push(presence);
 
                    }
 
                }
 
            }
 
            println!("DEBUGERINO: Presence after joining is {:#?}", &self.presence);
 

	
 
            // After adding everything we might have immediately found a solution
 
            if self.check_for_global_failure() {
 
                println!("DEBUG: Returning immediate failure?");
 
                return Some(LeaderConclusion::Failure);
 
            }
 
        }
 

	
 
        return None;
 
    }
 

	
 
    fn clear(&mut self) {
 
        self.local.clear();
 
        self.presence.clear();
 
        self.failure_reported = false;
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Generic Helpers
 
// -----------------------------------------------------------------------------
 

	
 
/// Recursively goes through the value group, attempting to find ports.
 
/// Duplicates will only be added once.
 
pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec<PortIdLocal>) {
 
    // Helper to check a value for a port and recurse if needed.
 
    use crate::protocol::eval::Value;
 

	
 
    fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec<PortIdLocal>) {
 
        match value {
 
            Value::Input(port_id) | Value::Output(port_id) => {
 
                // This is an actual port
 
                let cur_port = PortIdLocal::new(port_id.0.u32_suffix);
 
                for prev_port in ports.iter() {
 
                    if *prev_port == cur_port {
 
                        // Already added
 
                        return;
 
                    }
 
                }
 

	
 
                ports.push(cur_port);
 
            },
 
            Value::Array(heap_pos) |
 
            Value::Message(heap_pos) |
 
            Value::String(heap_pos) |
 
            Value::Struct(heap_pos) |
 
            Value::Union(_, heap_pos) => {
src/runtime2/scheduler.rs
Show inline comments
 
@@ -55,193 +55,203 @@ impl Scheduler {
 

	
 
            // Keep running until we should no longer immediately schedule the
 
            // connector.
 
            let mut cur_schedule = ConnectorScheduling::Immediate;
 
            while let ConnectorScheduling::Immediate = cur_schedule {
 
                self.handle_inbox_messages(scheduled);
 

	
 
                // Run the main behaviour of the connector, depending on its
 
                // current state.
 
                if scheduled.shutting_down {
 
                    // Nothing to do. But we're stil waiting for all our pending
 
                    // control messages to be answered.
 
                    self.debug_conn(connector_id, &format!("Shutting down, {} Acks remaining", scheduled.router.num_pending_acks()));
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // We're actually done, we can safely destroy the
 
                        // currently running connector
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    } else {
 
                        cur_schedule = ConnectorScheduling::NotNow;
 
                    }
 
                } else {
 
                    self.debug_conn(connector_id, "Running ...");
 
                    let scheduler_ctx = SchedulerCtx{ runtime: &*self.runtime };
 
                    let new_schedule = scheduled.connector.run(scheduler_ctx, &mut scheduled.ctx);
 
                    self.debug_conn(connector_id, &format!("Finished running (new scheduling is {:?})", new_schedule));
 

	
 
                    // Handle all of the output from the current run: messages to
 
                    // send and connectors to instantiate.
 
                    self.handle_changes_in_context(scheduled);
 

	
 
                    cur_schedule = new_schedule;
 
                }
 
            }
 

	
 
            // If here then the connector does not require immediate execution.
 
            // So enqueue it if requested, and otherwise put it in a sleeping
 
            // state.
 
            match cur_schedule {
 
                ConnectorScheduling::Immediate => unreachable!(),
 
                ConnectorScheduling::Later => {
 
                    // Simply queue it again later
 
                    self.runtime.push_work(connector_key);
 
                },
 
                ConnectorScheduling::NotNow => {
 
                    // Need to sleep, note that we are the only ones which are
 
                    // allows to set the sleeping state to `true`, and since
 
                    // we're running it must currently be `false`.
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
                ConnectorScheduling::Exit => {
 
                    // Prepare for exit. Set the shutdown flag and broadcast
 
                    // messages to notify peers of closing channels
 
                    scheduled.shutting_down = true;
 
                    for port in &scheduled.ctx.ports {
 
                        if port.state != PortState::Closed {
 
                            let message = scheduled.router.prepare_closing_channel(
 
                                port.self_id, port.peer_id,
 
                                connector_id
 
                            );
 
                            self.debug_conn(connector_id, &format!("Sending message to {:?} [ exit ] \n --- {:?}", port.peer_connector, message));
 
                            self.runtime.send_message(port.peer_connector, Message::Control(message));
 
                        }
 
                    }
 

	
 
                    // Any messages still in the public inbox should be handled
 
                    scheduled.ctx.inbox.clear_read_messages();
 
                    while let Some(ticket) = scheduled.ctx.get_next_message_ticket_even_if_not_in_sync() {
 
                        let message = scheduled.ctx.take_message_using_ticket(ticket);
 
                        self.handle_message_while_shutting_down(message, scheduled);
 
                    }
 

	
 
                    if scheduled.router.num_pending_acks() == 0 {
 
                        // All ports (if any) already closed
 
                        self.runtime.destroy_component(connector_key);
 
                        continue 'thread_loop;
 
                    }
 

	
 
                    self.try_go_to_sleep(connector_key, scheduled);
 
                },
 
            }
 
        }
 
    }
 

	
 
    /// Receiving messages from the public inbox and handling them or storing
 
    /// them in the component's private inbox
 
    fn handle_inbox_messages(&mut self, scheduled: &mut ScheduledConnector) {
 
        let connector_id = scheduled.ctx.id;
 

	
 
        while let Some(message) = scheduled.public.inbox.take_message() {
 
            // Check if the message has to be rerouted because we have moved the
 
            // target port to another component.
 
            self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message));
 
            if let Some(target_port) = message.target_port() {
 
                if let Some(other_component_id) = scheduled.router.should_reroute(target_port) {
 
                    self.debug_conn(connector_id, " ... Rerouting the message");
 

	
 
                    // We insert directly into the private inbox. Since we have
 
                    // a reroute entry the component can not yet be running.
 
                    if let Message::Control(_) = &message {
 
                        self.runtime.send_message(other_component_id, message);
 
                    } else {
 
                        let key = unsafe { ConnectorKey::from_id(other_component_id) };
 
                        let component = self.runtime.get_component_private(&key);
 
                        component.ctx.inbox.insert_new(message);
 
                    }
 

	
 
                    continue;
 
                }
 

	
 
                match scheduled.ctx.get_port_by_id(target_port) {
 
                    Some(port_info) => {
 
                        if port_info.state == PortState::Closed {
 
                            // We're no longer supposed to receive messages
 
                            // (rerouted message arrived much later!)
 
                            continue
 
                        }
 
                    },
 
                    None => {
 
                        // Apparently we no longer have a handle to the port
 
                        continue;
 
                    }
 
                }
 
            }
 

	
 
            // If here, then we should handle the message
 
            self.debug_conn(connector_id, " ... Handling the message");
 
            if let Message::Control(message) = &message {
 
                match message.content {
 
                    ControlContent::PortPeerChanged(port_id, new_target_connector_id) => {
 
                        // Need to change port target
 
                        let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                        port.peer_connector = new_target_connector_id;
 

	
 
                        // Note: for simplicity we program the scheduler to always finish
 
                        // running a connector with an empty outbox. If this ever changes
 
                        // then accepting the "port peer changed" message implies we need
 
                        // to change the recipient of the message in the outbox.
 
                        debug_assert!(scheduled.ctx.outbox.is_empty());
 

	
 
                        // And respond with an Ack
 
                        let ack_message = Message::Control(ControlMessage {
 
                            id: message.id,
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message to {:?} [pp ack]\n --- {:?}", message.sending_component_id, ack_message));
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::CloseChannel(port_id) => {
 
                        // Mark the port as being closed
 
                        let port = scheduled.ctx.get_port_mut_by_id(port_id).unwrap();
 
                        port.state = PortState::Closed;
 

	
 
                        // Send an Ack
 
                        let ack_message = Message::Control(ControlMessage {
 
                            id: message.id,
 
                            sending_component_id: connector_id,
 
                            content: ControlContent::Ack,
 
                        });
 
                        self.debug_conn(connector_id, &format!("Sending message to {:?} [cc ack] \n --- {:?}", message.sending_component_id, ack_message));
 
                        self.runtime.send_message(message.sending_component_id, ack_message);
 
                    },
 
                    ControlContent::Ack => {
 
                        if let Some(component_key) = scheduled.router.handle_ack(message.id) {
 
                            self.runtime.push_work(component_key);
 
                        };
 
                    },
 
                    ControlContent::Ping => {},
 
                }
 
            } else {
 
                // Not a control message
 
                if scheduled.shutting_down {
 
                    // Since we're shutting down, we just want to respond with a
 
                    // message saying the message did not arrive.
 
                    debug_assert!(scheduled.ctx.inbox.get_next_message_ticket().is_none()); // public inbox should be completely cleared
 
                    self.handle_message_while_shutting_down(message, scheduled);
 
                } else {
 
                    scheduled.ctx.inbox.insert_new(message);
 
                }
 
            }
 
        }
 
    }
 

	
 
    fn handle_message_while_shutting_down(&mut self, message: Message, scheduled: &mut ScheduledConnector) {
 
        let target_port_and_round_number = match message {
 
            Message::Data(msg) => Some((msg.data_header.target_port, msg.sync_header.sync_round)),
 
            Message::SyncComp(_) => None,
 
            Message::SyncPort(msg) => Some((msg.target_port, msg.sync_header.sync_round)),
 
            Message::SyncControl(_) => None,
 
            Message::Control(_) => None,
 
        };
 

	
 
        if let Some((target_port, sync_round)) = target_port_and_round_number {
 
            // This message is aimed at a port, but we're shutting down, so
 
            // notify the peer that its was not received properly.
 
            // (also: since we're shutting down, we're not in sync mode and
 
            // the context contains the definitive set of owned ports)
 
            let port = scheduled.ctx.get_port_by_id(target_port).unwrap();
 
            if port.state == PortState::Open {
 
                let message = SyncControlMessage {
 
                    in_response_to_sync_round: sync_round,
 
                    target_component_id: port.peer_connector,
src/runtime2/tests/mod.rs
Show inline comments
 
mod network_shapes;
 
mod api_component;
 
mod speculation;
 
mod data_transmission;
 
mod sync_failure;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
pub(crate) const NUM_THREADS: u32 =  8;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 1500; // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 10;       // number of loops within a single test (not used by all tests)
 
// pub(crate) const NUM_THREADS: u32 =  8;     // number of threads in runtime
 
// pub(crate) const NUM_INSTANCES: u32 = 750;  // number of test instances constructed
 
// pub(crate) const NUM_LOOPS: u32 = 10;       // number of loops within a single test (not used by all tests)
 

	
 
// pub(crate) const NUM_THREADS: u32 = 6;
 
// pub(crate) const NUM_INSTANCES: u32 = 1;
 
// pub(crate) const NUM_LOOPS: u32 = 15;
 
pub(crate) const NUM_THREADS: u32 = 6;
 
pub(crate) const NUM_INSTANCES: u32 = 1;
 
pub(crate) const NUM_LOOPS: u32 = 1;
 

	
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 
}
 

	
 
pub(crate) struct TestTimer {
 
    name: &'static str,
 
    started: std::time::Instant
 
}
 

	
 
impl TestTimer {
 
    pub(crate) fn new(name: &'static str) -> Self {
 
        Self{ name, started: std::time::Instant::now() }
 
    }
 
}
 

	
 
impl Drop for TestTimer {
 
    fn drop(&mut self) {
 
        let delta = std::time::Instant::now() - self.started;
 
        let nanos = (delta.as_secs_f64() * 1_000_000.0) as u64;
 
        let millis = nanos / 1000;
 
        let nanos = nanos % 1000;
 
        println!("[{}] Took {:>4}.{:03} ms", self.name, millis, nanos);
 
    }
 
}
src/runtime2/tests/sync_failure.rs
Show inline comments
 
// sync_failure.rs
 
//
 
// Various tests to ensure that failing components fail in a consistent way.
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_local_sync_failure() {
 
    // If the component exits cleanly, then the runtime exits cleanly, and the
 
    // test will finish
 
    const CODE: &'static str = "
 
    primitive immediate_failure_inside_sync() {
 
        u32[] only_allows_index_0 = { 1 };
 
        while (true) sync { // note the infinite loop
 
            auto value = only_allows_index_0[1];
 
        }
 
    }
 

	
 
    primitive immediate_failure_outside_sync() {
 
        u32[] only_allows_index_0 = { 1 };
 
        auto never_gonna_get = only_allows_index_0[1];
 
        while (true) sync {}
 
    }
 
    ";
 

	
 
    // let thing = TestTimer::new("local_sync_failure");
 
    run_test_in_runtime(CODE, |api| {
 
        api.create_connector("", "immediate_failure_outside_sync", ValueGroup::new_stack(Vec::new()))
 
            .expect("create component");
 

	
 
        api.create_connector("", "immediate_failure_inside_sync", ValueGroup::new_stack(Vec::new()))
 
            .expect("create component");
 
    })
 
}
 

	
 
const SHARED_SYNC_CODE: &'static str = "
 
enum Location { BeforeSync, AfterPut, AfterGet, AfterSync, Never }
 
primitive failing_at_location(in<bool> input, out<bool> output, Location loc) {
 
    u32[] failure_array = {};
 
    while (true) {
 
        if (loc == Location::BeforeSync) failure_array[0];
 
        sync {
 
            put(output, true);
 
            if (loc == Location::AfterPut) failure_array[0];
 
            auto received = get(input);
 
            assert(received);
 
            if (loc == Location::AfterGet) failure_array[0];
 
        }
 
        if (loc == Location::AfterSync) failure_array[0];
 
    }
 
}
 

	
 
composite constructor_a(Location loc) {
 
composite constructor_pair_a(Location loc) {
 
    channel output_a -> input_a;
 
    channel output_b -> input_b;
 
    new failing_at_location(input_b, output_a, loc);
 
    new failing_at_location(input_a, output_b, Location::Never);
 
}
 

	
 
composite constructor_b(Location loc) {
 
composite constructor_pair_b(Location loc) {
 
    channel output_a -> input_a;
 
    channel output_b -> input_b;
 
    new failing_at_location(input_b, output_a, Location::Never);
 
    new failing_at_location(input_a, output_b, loc);
 
}";
 
}
 

	
 
composite constructor_ring(u32 ring_size, u32 fail_a, Location loc_a, u32 fail_b, Location loc_b) {
 
    channel output_first -> input_old;
 
    channel output_cur -> input_new;
 

	
 
    u32 ring_index = 0;
 
    while (ring_index < ring_size) {
 
        auto cur_loc = Location::Never;
 
        if (ring_index == fail_a) cur_loc = loc_a;
 
        if (ring_index == fail_b) cur_loc = loc_b;
 

	
 
        new failing_at_location(input_old, output_cur, cur_loc);
 

	
 
        if (ring_index == ring_size - 2) {
 
            // Don't create a new channel, join up the last one
 
            output_cur = output_first;
 
            input_old = input_new;
 
        } else if (ring_index != ring_size - 1) {
 
            channel output_fresh -> input_fresh;
 
            input_old = input_new;
 
            output_cur = output_fresh;
 
            input_new = input_fresh;
 
        }
 

	
 
        ring_index += 1;
 
    }
 
}
 
";
 

	
 
#[test]
 
fn test_shared_sync_failure_variant_a() {
 
fn test_shared_sync_failure_pair_variant_a() {
 
    // One fails, the other one should somehow detect it and fail as well. This
 
    // variant constructs the failing component first.
 
    run_test_in_runtime(SHARED_SYNC_CODE, |api| {
 
        for variant in 0..4 { // all `Location` enum variants, except `Never`.
 
            // Create the channels
 
            api.create_connector("", "constructor_a", ValueGroup::new_stack(vec![
 
            api.create_connector("", "constructor_pair_a", ValueGroup::new_stack(vec![
 
                Value::Enum(variant)
 
            ])).expect("create connector");
 
        }
 
    })
 
}
 

	
 
#[test]
 
fn test_shared_sync_failure_variant_b() {
 
fn test_shared_sync_failure_pair_variant_b() {
 
    // One fails, the other one should somehow detect it and fail as well. This
 
    // variant constructs the successful component first.
 
    run_test_in_runtime(SHARED_SYNC_CODE, |api| {
 
        for variant in 0..4 {
 
            api.create_connector("", "constructor_b", ValueGroup::new_stack(vec![
 
            api.create_connector("", "constructor_pair_b", ValueGroup::new_stack(vec![
 
                Value::Enum(variant)
 
            ])).expect("create connector");
 
        }
 
    })
 
}
 

	
 
#[test]
 
fn test_shared_sync_failure_ring_variant_a() {
 
    // Only one component in the ring should fail
 
    const RING_SIZE: u32 = 4;
 
    run_test_in_runtime(SHARED_SYNC_CODE, |api| {
 
        for variant in 0..4 {
 
            api.create_connector("", "constructor_ring", ValueGroup::new_stack(vec![
 
                Value::UInt32(RING_SIZE),
 
                Value::UInt32(RING_SIZE / 2), Value::Enum(variant), // fail "halfway" the ring
 
                Value::UInt32(RING_SIZE), Value::Enum(0), // never occurs, index is equal to ring size
 
            ])).expect("create connector");
 
        }
 
    })
 
}
 
\ No newline at end of file
0 comments (0 inline, 0 general)