Files @ 8a530d2dc72f
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs - annotation

8a530d2dc72f 12.3 KiB application/rls-services+xml Show Source Show as Raw Download as Raw
mh
basic message passing sync-resolving
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
daf15df0f8ca
daf15df0f8ca
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
8a530d2dc72f
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8a530d2dc72f
8a530d2dc72f
8a530d2dc72f
8a530d2dc72f
8a530d2dc72f
8a530d2dc72f
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8a530d2dc72f
58dfabd1be9f
58dfabd1be9f
c97c5d60bc61
c97c5d60bc61
58dfabd1be9f
58dfabd1be9f
8a530d2dc72f
58dfabd1be9f
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
58dfabd1be9f
58dfabd1be9f
8a530d2dc72f
58dfabd1be9f
c97c5d60bc61
c97c5d60bc61
58dfabd1be9f
58dfabd1be9f
8a530d2dc72f
58dfabd1be9f
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
8a530d2dc72f
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
c97c5d60bc61
58dfabd1be9f
58dfabd1be9f
8a530d2dc72f
8a530d2dc72f
8a530d2dc72f
8a530d2dc72f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
58dfabd1be9f
58dfabd1be9f
8a530d2dc72f
58dfabd1be9f
58dfabd1be9f
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
cf26538b25dc
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
daf15df0f8ca
8c5d438b0fa3
8c5d438b0fa3
daf15df0f8ca
8c5d438b0fa3
cf26538b25dc
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
8c5d438b0fa3
daf15df0f8ca
daf15df0f8ca
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
daf15df0f8ca
daf15df0f8ca
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
8c5d438b0fa3
daf15df0f8ca
8c5d438b0fa3
8c5d438b0fa3
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
daf15df0f8ca
cf26538b25dc
cf26538b25dc
cf26538b25dc
cf26538b25dc
/**
inbox.rs

Contains various types of inboxes and message types for the connectors. There
are two kinds of inboxes:

The `PublicInbox` is a simple message queue. Messages are put in by various
threads, and they're taken out by a single thread. These messages may contain
control messages and may be filtered or redirected by the scheduler.

The `PrivateInbox` is a temporary storage for all messages that are received
within a certain sync-round.
**/

use std::collections::VecDeque;
use std::sync::{RwLock, RwLockReadGuard, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};

use crate::protocol::eval::ValueGroup;
use super::connector::{BranchId, PortIdLocal};
use super::global_store::ConnectorId;

/// A message prepared by a connector. Waiting to be picked up by the runtime to
/// be sent to another connector.
#[derive(Clone)]
pub struct OutgoingDataMessage {
    pub sending_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId, // may be invalid, implying no prev branch id
    pub sender_cur_branch_id: BranchId, // always valid
    pub message: ValueGroup,
}

pub enum OutgoingMessage {
    Data(OutgoingDataMessage),
    Sync(SyncMessage),
    Solution(SolutionMessage),
}

/// A message that has been delivered (after being imbued with the receiving
/// port by the scheduler) to a connector.
#[derive(Clone)]
pub struct DataMessage {
    pub sending_connector: ConnectorId,
    pub sending_port: PortIdLocal,
    pub receiving_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId,
    pub sender_cur_branch_id: BranchId,
    pub message: ValueGroup,
}

#[derive(Clone)]
pub enum SyncBranchConstraint {
    SilentPort(PortIdLocal),
    BranchNumber(BranchId),
    PortMapping(PortIdLocal, BranchId),
}

#[derive(Clone)]
pub struct SyncConnectorSolution {
    pub connector_id: ConnectorId,
    pub terminating_branch_id: BranchId,
    pub execution_branch_ids: Vec<BranchId>, // no particular ordering of IDs enforced
    pub final_port_mapping: Vec<(PortIdLocal, BranchId)>
}

#[derive(Clone)]
pub struct SyncConnectorConstraints {
    pub connector_id: ConnectorId,
    pub constraints: Vec<SyncBranchConstraint>,
}

#[derive(Clone)]
pub struct SyncMessage {
    pub local_solutions: Vec<SyncConnectorSolution>,
    pub constraints: Vec<SyncConnectorConstraints>,
    pub to_visit: Vec<ConnectorId>,
}

// TODO: Shouldn't really be here, right?
impl SyncMessage {
    /// Creates a new sync message. Assumes that it is created by a connector
    /// that has just encountered a new local solution.
    pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self {
        let mut local_solutions = Vec::with_capacity(approximate_peers);
        local_solutions.push(initial_solution);

        return Self{
            local_solutions,
            constraints: Vec::with_capacity(approximate_peers),
            to_visit: Vec::with_capacity(approximate_peers),
        };
    }

    /// Checks if a connector has already provided a local solution
    pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool {
        return self.local_solutions
            .iter()
            .any(|v| v.connector_id == connector_id);
    }

    /// Adds a new constraint. If the connector has already provided a local
    /// solution then the constraint will be checked. Otherwise the constraint
    /// will be added to the solution. If this is the first constraint for a
    /// connector then it will be added to the connectors that still have to be
    /// visited.
    ///
    /// If this returns true then the constraint was added, or the local
    /// solution for the specified connector satisfies the constraint. If this
    /// function returns an error then we're dealing with a nefarious peer.
    pub(crate) fn add_or_check_constraint(
        &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint
    ) -> Result<bool, ()> {
        if self.has_local_solution_for(connector_id) {
            return self.check_constraint(connector_id, constraint);
        } else {
            self.add_constraint(connector_id, constraint);
            return Ok(true);
        }
    }

    /// Pushes a new connector constraint. Caller must ensure that the solution
    /// has not yet arrived at the specified connector (because then it would no
    /// longer have constraints, but a proposed solution instead).
    pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) {
        debug_assert!(!self.has_local_solution_for(connector_id));

        let position = self.constraints
            .iter()
            .position(|v| v.connector_id == connector_id);

        match position {
            Some(index) => {
                // Has pre-existing constraints
                debug_assert!(self.to_visit.contains(&connector_id));
                let entry = &mut self.constraints[index];
                entry.constraints.push(constraint);
            },
            None => {
                debug_assert!(!self.to_visit.contains(&connector_id));
                self.constraints.push(SyncConnectorConstraints{
                    connector_id,
                    constraints: vec![constraint],
                });
                self.to_visit.push(connector_id);
            }
        }
    }

    /// Checks if a constraint is satisfied by a solution. Caller must make sure
    /// that a local solution has already been provided. Will return an error
    /// value only if the provided constraint does not make sense (i.e. a
    /// nefarious peer has supplied a constraint with a port we do not own).
    pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result<bool, ()>  {
        debug_assert!(self.has_local_solution_for(connector_id));

        let entry = self.local_solutions
            .iter()
            .find(|v| v.connector_id == connector_id)
            .unwrap();

        match constraint {
            SyncBranchConstraint::SilentPort(silent_port_id) => {
                for (port_id, mapped_id) in &entry.final_port_mapping {
                    if port_id == silent_port_id {
                        // If silent, then mapped ID is invalid
                        return Ok(!mapped_id.is_valid())
                    }
                }

                return Err(());
            },
            SyncBranchConstraint::BranchNumber(expected_branch_id) => {
                return Ok(entry.execution_branch_ids.contains(&expected_branch_id));
            },
            SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
                for (port_id, mapped_id) in &entry.final_port_mapping {
                    if port_id == port_id {
                        return Ok(*mapped_id == expected_branch_id);
                    }
                }

                return Err(());
            },
        }
    }
}

pub struct SolutionMessage {
    pub local_solutions: Vec<(ConnectorId, BranchId)>,
}

/// A control message. These might be sent by the scheduler to notify eachother
/// of asynchronous state changes.
pub struct ControlMessage {
    pub id: u32, // generic identifier, used to match request to response
    pub sender: ConnectorId,
    pub content: ControlMessageVariant,
}

pub enum ControlMessageVariant {
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
}

/// Generic message in the `PublicInbox`, handled by the scheduler (which takes
/// out and handles all control message and potential routing). The correctly
/// addressed `Data` variants will end up at the connector.
pub enum Message {
    Data(DataMessage),          // data message, handled by connector
    Sync(SyncMessage),          // sync message, handled by both connector/scheduler
    Solution(SolutionMessage),  // solution message, finishing a sync round
    Control(ControlMessage),    // control message, handled by scheduler
    Ping,                       // ping message, intentionally waking up a connector (used for native connectors)
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<Message>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub fn insert_message(&self, message: Message) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub fn take_message(&self) -> Option<Message> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }
}

pub struct PrivateInbox {
    // "Normal" messages, intended for a PDL protocol. These need to stick
    // around during an entire sync-block (to handle `put`s for which the
    // corresponding `get`s have not yet been reached).
    messages: Vec<DataMessage>,
    len_read: usize,
}

impl PrivateInbox {
    pub fn new() -> Self {
        Self{
            messages: Vec::new(),
            len_read: 0,
        }
    }

    /// Will insert the message into the inbox. Only exception is when the tuple
    /// (prev_branch_id, cur_branch_id, receiving_port_id) already exists, then
    /// nothing is inserted..
    pub fn insert_message(&mut self, message: DataMessage) {
        for existing in self.messages.iter() {
            if existing.sender_prev_branch_id == message.sender_prev_branch_id &&
                    existing.sender_cur_branch_id == message.sender_cur_branch_id &&
                    existing.receiving_port == message.receiving_port {
                // Message was already received
                return;
            }
        }

        self.messages.push(message);
    }

    /// Retrieves all previously read messages that satisfy the provided
    /// speculative conditions. Note that the inbox remains read-locked until
    /// the returned iterator is dropped. Should only be called by the
    /// inbox-reader (i.e. the thread executing a connector's PDL code).
    ///
    /// This function should only be used to check if already-received messages
    /// could be received by a newly encountered `get` call in a connector's
    /// PDL code.
    pub fn get_messages(&self, port_id: PortIdLocal, prev_branch_id: BranchId) -> InboxMessageIter {
        return InboxMessageIter{
            messages: &self.messages,
            next_index: 0,
            max_index: self.len_read,
            match_port_id: port_id,
            match_prev_branch_id: prev_branch_id,
        };
    }

    /// Retrieves the next unread message. Should only be called by the
    /// inbox-reader.
    pub fn next_message(&mut self) -> Option<&DataMessage> {
        if self.len_read == self.messages.len() {
            return None;
        }

        let to_return = &self.messages[self.len_read];
        self.len_read += 1;
        return Some(to_return);
    }

    /// Simply empties the inbox
    pub fn clear(&mut self) {
        self.messages.clear();
        self.len_read = 0;
    }
}

/// Iterator over previously received messages in the inbox.
pub struct InboxMessageIter<'i> {
    messages: &'i Vec<DataMessage>,
    next_index: usize,
    max_index: usize,
    match_port_id: PortIdLocal,
    match_prev_branch_id: BranchId,
}

impl<'m: 'i, 'i> Iterator for InboxMessageIter<'i> {
    type Item = &'m DataMessage;

    fn next(&'m mut self) -> Option<Self::Item> {
        // Loop until match is found or at end of messages
        while self.next_index < self.max_index {
            let cur_message = &self.messages[self.next_index];
            if cur_message.receiving_port == self.match_port_id && cur_message.sender_prev_branch_id == self.match_prev_branch_id {
                // Found a match
                break;
            }

            self.next_index += 1;
        }

        if self.next_index == self.max_index {
            return None;
        }

        let message = &self.messages[self.next_index];
        self.next_index += 1;
        return Some(message);
    }
}