Files @ 418aa1170154
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs

418aa1170154 11.9 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
initial sync solution generation
/**
inbox.rs

Contains various types of inboxes and message types for the connectors. There
are two kinds of inboxes:

The `PublicInbox` is a simple message queue. Messages are put in by various
threads, and they're taken out by a single thread. These messages may contain
control messages and may be filtered or redirected by the scheduler.

The `PrivateInbox` is a temporary storage for all messages that are received
within a certain sync-round.
**/

use std::collections::VecDeque;
use std::sync::{RwLock, RwLockReadGuard, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::protocol::eval::ValueGroup;
use super::connector::{BranchId, PortIdLocal};
use super::global_store::ConnectorId;

/// A message prepared by a connector. Waiting to be picked up by the runtime to
/// be sent to another connector.
#[derive(Clone)]
pub struct OutgoingMessage {
    pub sending_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
    pub sender_cur_branch_id: BranchId, // always valid
    pub message: ValueGroup,
}

/// A message that has been delivered (after being imbued with the receiving
/// port by the scheduler) to a connector.
#[derive(Clone)]
pub struct DataMessage {
    pub sending_connector: ConnectorId,
    pub sending_port: PortIdLocal,
    pub receiving_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId,
    pub sender_cur_branch_id: BranchId,
    pub message: ValueGroup,
}

pub enum SyncBranchConstraint {
    SilentPort(PortIdLocal),
    BranchNumber(BranchId),
    PortMapping(PortIdLocal, BranchId),
}

pub struct SyncConnectorSolution {
    pub connector_id: ConnectorId,
    pub terminating_branch_id: BranchId,
    pub execution_branch_ids: Vec<BranchId>, // no particular ordering of IDs enforced
    pub final_port_mapping: Vec<(PortIdLocal, BranchId)>
}

pub struct SyncConnectorConstraints {
    pub connector_id: ConnectorId,
    pub constraints: Vec<SyncBranchConstraint>,
}

pub struct SyncMessage {
    pub local_solutions: Vec<SyncConnectorSolution>,
    pub constraints: Vec<SyncConnectorConstraints>,
    pub to_visit: Vec<ConnectorId>,
}

// TODO: Shouldn't really be here, right?
impl SyncMessage {
    /// Creates a new sync message. Assumes that it is created by a connector
    /// that has just encountered a new local solution.
    pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self {
        let mut local_solutions = Vec::with_capacity(approximate_peers);
        local_solutions.push(initial_solution);

        return Self{
            local_solutions,
            constraints: Vec::with_capacity(approximate_peers),
            to_visit: Vec::with_capacity(approximate_peers),
        };
    }

    /// Checks if a connector has already provided a local solution
    pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool {
        return self.local_solutions
            .iter()
            .any(|v| v.connector_id == connector_id);
    }

    /// Adds a new constraint. If the connector has already provided a local
    /// solution then the constraint will be checked. Otherwise the constraint
    /// will be added to the solution. If this is the first constraint for a
    /// connector then it will be added to the connectors that still have to be
    /// visited.
    ///
    /// If this returns true then the constraint was added, or the local
    /// solution for the specified connector satisfies the constraint. If this
    /// function returns an error then we're dealing with a nefarious peer.
    pub(crate) fn add_or_check_constraint(
        &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint
    ) -> Result<bool, ()> {
        if self.has_local_solution_for(connector_id) {
            return self.check_constraint(connector_id, constraint);
        } else {
            self.add_constraint(connector_id);
            return Ok(true);
        }
    }

    /// Pushes a new connector constraint. Caller must ensure that the solution
    /// has not yet arrived at the specified connector (because then it would no
    /// longer have constraints, but a proposed solution instead).
    pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) {
        debug_assert!(!self.has_local_solution_for(connector_id));

        let position = self.constraints
            .iter()
            .position(|v| v.connector_id == connector_id);

        match position {
            Some(index) => {
                // Has pre-existing constraints
                debug_assert!(self.to_visit.contains(&connector_id));
                let entry = &mut self.constraints[index];
                entry.constraints.push(constraint);
            },
            None => {
                debug_assert!(!self.to_visit.contains(&connector_id));
                self.constraints.push(SyncConnectorConstraints{
                    connector_id,
                    constraints: vec![constraint],
                });
                self.to_visit.push(connector_id);
            }
        }
    }

    /// Checks if a constraint is satisfied by a solution. Caller must make sure
    /// that a local solution has already been provided. Will return an error
    /// value only if the provided constraint does not make sense (i.e. a
    /// nefarious peer has supplied a constraint with a port we do not own).
    pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result<bool, ()>  {
        debug_assert!(self.has_local_solution_for(connector_id));

        let entry = self.local_solutions
            .iter()
            .find(|v| v.connector_id == connector_id)
            .unwrap();

        match constraint {
            SyncBranchConstraint::SilentPort(silent_port_id) => {
                for (port_id, mapped_id) in &entry.final_port_mapping {
                    if port_id == silent_port_id {
                        // If silent, then mapped ID is invalid
                        return Ok(!mapped_id.is_valid())
                    }
                }

                return Err(());
            },
            SyncBranchConstraint::BranchNumber(expected_branch_id) => {
                return Ok(entry.execution_branch_ids.contains(&expected_branch_id));
            },
            SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
                for (port_id, mapped_id) in &entry.final_port_mapping {
                    if port_id == port_id {
                        return Ok(*mapped_id == expected_branch_id);
                    }
                }

                return Err(());
            },
        }
    }
}

/// A control message. These might be sent by the scheduler to notify eachother
/// of asynchronous state changes.
pub struct ControlMessage {
    pub id: u32, // generic identifier, used to match request to response
    pub sender: ConnectorId,
    pub content: ControlMessageVariant,
}

pub enum ControlMessageVariant {
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
}

/// Generic message in the `PublicInbox`, handled by the scheduler (which takes
/// out and handles all control message and potential routing). The correctly
/// addressed `Data` variants will end up at the connector.
pub enum Message {
    Data(DataMessage),          // data message, handled by connector
    Sync(SyncMessage),          // sync message, handled by both connector/scheduler
    Control(ControlMessage),    // control message, handled by scheduler
    Ping,                       // ping message, intentionally waking up a connector (used for native connectors)
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<Message>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub fn insert_message(&self, message: Message) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub fn take_message(&self) -> Option<Message> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }
}

pub struct PrivateInbox {
    // "Normal" messages, intended for a PDL protocol. These need to stick
    // around during an entire sync-block (to handle `put`s for which the
    // corresponding `get`s have not yet been reached).
    messages: Vec<DataMessage>,
    len_read: usize,
}

impl PrivateInbox {
    pub fn new() -> Self {
        Self{
            messages: Vec::new(),
            len_read: 0,
        }
    }

    /// Will insert the message into the inbox. Only exception is when the tuple
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
    /// nothing is inserted..
    pub fn insert_message(&mut self, message: DataMessage) {
        for existing in self.messages.iter() {
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
                    existing.receiving_port == message.receiving_port {
                // Message was already received
                return;
            }
        }

        self.messages.push(message);
    }

    /// Retrieves all previously read messages that satisfy the provided
    /// speculative conditions. Note that the inbox remains read-locked until
    /// the returned iterator is dropped. Should only be called by the
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
    ///
    /// This function should only be used to check if already-received messages
    /// could be received by a newly encountered `get` call in a connector's
    /// PDL code.
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
        return InboxMessageIter{
            messages: &self.messages,
            next_index: 0,
            max_index: self.len_read,
            match_port_id: port_id,
            match_prev_branch_id: prev_branch_id,
        };
    }

    /// Retrieves the next unread message. Should only be called by the
    /// inbox-reader.
    pub fn next_message(&mut self) -> Option<&DataMessage> {
        if self.len_read == self.messages.len() {
            return None;
        }

        let to_return = &self.messages[self.len_read];
        self.len_read += 1;
        return Some(to_return);
    }

    /// Simply empties the inbox
    pub fn clear(&mut self) {
        self.messages.clear();
        self.len_read = 0;
    }
}

/// Iterator over previously received messages in the inbox.
pub struct InboxMessageIter<'i> {
    messages: &'i Vec<DataMessage>,
    next_index: usize,
    max_index: usize,
    match_port_id: PortIdLocal,
    match_prev_branch_id: BranchId,
}

impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
    type Item = &'m DataMessage;

    fn next(&'m mut self) -> Option<Self::Item> {
        // Loop until match is found or at end of messages
        while self.next_index < self.max_index {
            let cur_message = &self.messages[self.next_index];
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
                // Found a match
                break;
            }

            self.next_index += 1;
        }

        if self.next_index == self.max_index {
            return None;
        }

        let message = &self.messages[self.next_index];
        self.next_index += 1;
        return Some(message);
    }
}