Files @ 98aadfccbafd
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs

98aadfccbafd 12.2 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
solving problem of connectors shutting down
/**
inbox.rs

Contains various types of inboxes and message types for the connectors. There
are two kinds of inboxes:

The `PublicInbox` is a simple message queue. Messages are put in by various
threads, and they're taken out by a single thread. These messages may contain
control messages and may be filtered or redirected by the scheduler.

The `PrivateInbox` is a temporary storage for all messages that are received
within a certain sync-round.
**/

use std::collections::VecDeque;
use std::sync::Mutex;

use super::ConnectorId;
use crate::protocol::eval::ValueGroup;
use super::connector::BranchId;
use super::port::PortIdLocal;

/// A message that has been delivered (after being imbued with the receiving
/// port by the scheduler) to a connector.
// TODO: Remove Debug on messages
#[derive(Debug, Clone)]
pub struct DataMessage {
    pub sending_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId,
    pub sender_cur_branch_id: BranchId,
    pub message: ValueGroup,
}

#[derive(Debug, Clone)]
pub enum SyncBranchConstraint {
    SilentPort(PortIdLocal),
    BranchNumber(BranchId),
    PortMapping(PortIdLocal, BranchId),
}

#[derive(Debug, Clone)]
pub struct SyncConnectorSolution {
    pub connector_id: ConnectorId,
    pub terminating_branch_id: BranchId,
    pub execution_branch_ids: Vec<BranchId>, // no particular ordering of IDs enforced
    pub final_port_mapping: Vec<(PortIdLocal, BranchId)>
}

#[derive(Debug, Clone)]
pub struct SyncConnectorConstraints {
    pub connector_id: ConnectorId,
    pub constraints: Vec<SyncBranchConstraint>,
}

#[derive(Debug, Clone)]
pub struct SyncMessage {
    pub local_solutions: Vec<SyncConnectorSolution>,
    pub constraints: Vec<SyncConnectorConstraints>,
    pub to_visit: Vec<ConnectorId>,
}

// TODO: Shouldn't really be here, right?
impl SyncMessage {
    /// Creates a new sync message. Assumes that it is created by a connector
    /// that has just encountered a new local solution.
    pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self {
        let mut local_solutions = Vec::with_capacity(approximate_peers);
        local_solutions.push(initial_solution);

        return Self{
            local_solutions,
            constraints: Vec::with_capacity(approximate_peers),
            to_visit: Vec::with_capacity(approximate_peers),
        };
    }

    /// Checks if a connector has already provided a local solution
    pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool {
        return self.local_solutions
            .iter()
            .any(|v| v.connector_id == connector_id);
    }

    /// Adds a new constraint. If the connector has already provided a local
    /// solution then the constraint will be checked. Otherwise the constraint
    /// will be added to the solution. If this is the first constraint for a
    /// connector then it will be added to the connectors that still have to be
    /// visited.
    ///
    /// If this returns true then the constraint was added, or the local
    /// solution for the specified connector satisfies the constraint. If this
    /// function returns an error then we're dealing with a nefarious peer.
    pub(crate) fn add_or_check_constraint(
        &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint
    ) -> Result<bool, ()> {
        if self.has_local_solution_for(connector_id) {
            return self.check_constraint(connector_id, constraint);
        } else {
            self.add_constraint(connector_id, constraint);
            return Ok(true);
        }
    }

    /// Pushes a new connector constraint. Caller must ensure that the solution
    /// has not yet arrived at the specified connector (because then it would no
    /// longer have constraints, but a proposed solution instead).
    pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) {
        debug_assert!(!self.has_local_solution_for(connector_id));

        let position = self.constraints
            .iter()
            .position(|v| v.connector_id == connector_id);

        match position {
            Some(index) => {
                // Has pre-existing constraints
                debug_assert!(self.to_visit.contains(&connector_id));
                let entry = &mut self.constraints[index];
                entry.constraints.push(constraint);
            },
            None => {
                debug_assert!(!self.to_visit.contains(&connector_id));
                self.constraints.push(SyncConnectorConstraints{
                    connector_id,
                    constraints: vec![constraint],
                });
                self.to_visit.push(connector_id);
            }
        }
    }

    /// Checks if a constraint is satisfied by a solution. Caller must make sure
    /// that a local solution has already been provided. Will return an error
    /// value only if the provided constraint does not make sense (i.e. a
    /// nefarious peer has supplied a constraint with a port we do not own).
    pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result<bool, ()>  {
        debug_assert!(self.has_local_solution_for(connector_id));

        let entry = self.local_solutions
            .iter()
            .find(|v| v.connector_id == connector_id)
            .unwrap();

        match constraint {
            SyncBranchConstraint::SilentPort(silent_port_id) => {
                for (port_id, mapped_id) in &entry.final_port_mapping {
                    if *port_id == silent_port_id {
                        // If silent, then mapped ID is invalid
                        return Ok(!mapped_id.is_valid())
                    }
                }

                return Err(());
            },
            SyncBranchConstraint::BranchNumber(expected_branch_id) => {
                return Ok(entry.execution_branch_ids.contains(&expected_branch_id));
            },
            SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
                for (port_id, mapped_id) in &entry.final_port_mapping {
                    if port_id == port_id {
                        return Ok(*mapped_id == expected_branch_id);
                    }
                }

                return Err(());
            },
        }
    }
}

#[derive(Debug, Clone)]
pub struct SolutionMessage {
    pub comparison_number: u64,
    pub connector_origin: ConnectorId,
    pub local_solutions: Vec<(ConnectorId, BranchId)>,
    pub to_visit: Vec<ConnectorId>,
}

/// A control message. These might be sent by the scheduler to notify eachother
/// of asynchronous state changes.
#[derive(Debug, Clone)]
pub struct ControlMessage {
    pub id: u32, // generic identifier, used to match request to response
    pub content: ControlMessageVariant,
}

#[derive(Debug, Clone)]
pub enum ControlMessageVariant {
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
}

/// Generic message contents.
#[derive(Debug, Clone)]
pub enum MessageContents {
    Data(DataMessage),              // data message, handled by connector
    Sync(SyncMessage),              // sync message, handled by both connector/scheduler
    RequestCommit(SolutionMessage), // solution message, requesting participants to commit
    ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to
    Control(ControlMessage),        // control message, handled by scheduler
    Ping,                           // ping message, intentionally waking up a connector (used for native connectors)
}

#[derive(Debug)]
pub struct Message {
    pub sending_connector: ConnectorId,
    pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector)
    pub contents: MessageContents,
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<Message>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub fn insert_message(&self, message: Message) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub fn take_message(&self) -> Option<Message> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }
}

pub(crate) struct PrivateInbox {
    // "Normal" messages, intended for a PDL protocol. These need to stick
    // around during an entire sync-block (to handle `put`s for which the
    // corresponding `get`s have not yet been reached).
    messages: Vec<(PortIdLocal, DataMessage)>,
    len_read: usize,
}

impl PrivateInbox {
    pub fn new() -> Self {
        Self{
            messages: Vec::new(),
            len_read: 0,
        }
    }

    /// Will insert the message into the inbox. Only exception is when the tuple
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
    /// nothing is inserted..
    pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, message: DataMessage) {
        for (existing_target_port, existing) in self.messages.iter() {
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
                    *existing_target_port == target_port {
                // Message was already received
                return;
            }
        }

        self.messages.push((target_port, message));
    }

    /// Retrieves all previously read messages that satisfy the provided
    /// speculative conditions. Note that the inbox remains read-locked until
    /// the returned iterator is dropped. Should only be called by the
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
    ///
    /// This function should only be used to check if already-received messages
    /// could be received by a newly encountered `get` call in a connector's
    /// PDL code.
    pub(crate) fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
        return InboxMessageIter {
            messages: &self.messages,
            next_index: 0,
            max_index: self.len_read,
            match_port_id: port_id,
            match_prev_branch_id: prev_branch_id,
        };
    }

    /// Retrieves the next unread message. Should only be called by the
    /// inbox-reader.
    pub(crate) fn next_message(&mut self) -> Option<&DataMessage> {
        if self.len_read == self.messages.len() {
            return None;
        }

        let (_, to_return) = &self.messages[self.len_read];
        self.len_read += 1;
        return Some(to_return);
    }

    /// Simply empties the inbox
    pub(crate) fn clear(&mut self) {
        self.messages.clear();
        self.len_read = 0;
    }
}

/// Iterator over previously received messages in the inbox.
pub(crate) struct InboxMessageIter<'i> {
    messages: &'i Vec<(PortIdLocal, DataMessage)>,
    next_index: usize,
    max_index: usize,
    match_port_id: PortIdLocal,
    match_prev_branch_id: BranchId,
}

impl<'i> Iterator for InboxMessageIter<'i> {
    type Item = &'i DataMessage;

    fn next(&mut self) -> Option<Self::Item> {
        // Loop until match is found or at end of messages
        while self.next_index < self.max_index {
            let (target_port, cur_message) = &self.messages[self.next_index];
            if *target_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
                // Found a match
                break;
            }

            self.next_index += 1;
        }

        if self.next_index == self.max_index {
            return None;
        }

        let (_, message) = &self.messages[self.next_index];
        self.next_index += 1;
        return Some(message);
    }
}