Files @ ecc47971d535
Branch filter:

Location: CSY/reowolf/src/runtime2/inbox.rs

ecc47971d535 8.8 KiB application/rls-services+xml Show Annotation Show as Raw Download as Raw
MH
WIP on handling sync solution messages
/**
inbox.rs

Contains various types of inboxes and message types for the connectors. There
are two kinds of inboxes:

The `PublicInbox` is a simple message queue. Messages are put in by various
threads, and they're taken out by a single thread. These messages may contain
control messages and may be filtered or redirected by the scheduler.

The `PrivateInbox` is a temporary storage for all messages that are received
within a certain sync-round.
**/

use std::collections::VecDeque;
use std::sync::Mutex;

use super::ConnectorId;
use crate::protocol::eval::ValueGroup;
use crate::runtime2::inbox2::MessageFancy;
use super::connector::BranchId;
use super::port::PortIdLocal;

/// A message that has been delivered (after being imbued with the receiving
/// port by the scheduler) to a connector.
// TODO: Remove Debug on messages
#[derive(Debug, Clone)]
pub struct DataMessage {
    pub sending_port: PortIdLocal,
    pub sender_prev_branch_id: BranchId,
    pub sender_cur_branch_id: BranchId,
    pub message: ValueGroup,
}

#[derive(Debug, Clone)]
pub enum SyncBranchConstraint {
    SilentPort(PortIdLocal),
    BranchNumber(BranchId),
    PortMapping(PortIdLocal, BranchId),
}

#[derive(Debug, Clone)]
pub struct SyncConnectorSolution {
    pub connector_id: ConnectorId,
    pub terminating_branch_id: BranchId,
    pub execution_branch_ids: Vec<BranchId>, // no particular ordering of IDs enforced
    pub final_port_mapping: Vec<(PortIdLocal, BranchId)>
}

#[derive(Debug, Clone)]
pub struct SyncConnectorConstraints {
    pub connector_id: ConnectorId,
    pub constraints: Vec<SyncBranchConstraint>,
}

#[derive(Debug, Clone)]
pub struct SyncMessage {
    pub local_solutions: Vec<SyncConnectorSolution>,
    pub constraints: Vec<SyncConnectorConstraints>,
    pub to_visit: Vec<ConnectorId>,
}

// TODO: Shouldn't really be here, right?
impl SyncMessage {
    /// Creates a new sync message. Assumes that it is created by a connector
    /// that has just encountered a new local solution.
    pub(crate) fn new(initial_solution: SyncConnectorSolution, approximate_peers: usize) -> Self {
        let mut local_solutions = Vec::with_capacity(approximate_peers);
        local_solutions.push(initial_solution);

        return Self{
            local_solutions,
            constraints: Vec::with_capacity(approximate_peers),
            to_visit: Vec::with_capacity(approximate_peers),
        };
    }

    /// Checks if a connector has already provided a local solution
    pub(crate) fn has_local_solution_for(&self, connector_id: ConnectorId) -> bool {
        return self.local_solutions
            .iter()
            .any(|v| v.connector_id == connector_id);
    }

    /// Adds a new constraint. If the connector has already provided a local
    /// solution then the constraint will be checked. Otherwise the constraint
    /// will be added to the solution. If this is the first constraint for a
    /// connector then it will be added to the connectors that still have to be
    /// visited.
    ///
    /// If this returns true then the constraint was added, or the local
    /// solution for the specified connector satisfies the constraint. If this
    /// function returns an error then we're dealing with a nefarious peer.
    pub(crate) fn add_or_check_constraint(
        &mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint
    ) -> Result<bool, ()> {
        if self.has_local_solution_for(connector_id) {
            return self.check_constraint(connector_id, constraint);
        } else {
            self.add_constraint(connector_id, constraint);
            return Ok(true);
        }
    }

    /// Pushes a new connector constraint. Caller must ensure that the solution
    /// has not yet arrived at the specified connector (because then it would no
    /// longer have constraints, but a proposed solution instead).
    pub(crate) fn add_constraint(&mut self, connector_id: ConnectorId, constraint: SyncBranchConstraint) {
        debug_assert!(!self.has_local_solution_for(connector_id));

        let position = self.constraints
            .iter()
            .position(|v| v.connector_id == connector_id);

        match position {
            Some(index) => {
                // Has pre-existing constraints
                debug_assert!(self.to_visit.contains(&connector_id));
                let entry = &mut self.constraints[index];
                entry.constraints.push(constraint);
            },
            None => {
                debug_assert!(!self.to_visit.contains(&connector_id));
                self.constraints.push(SyncConnectorConstraints{
                    connector_id,
                    constraints: vec![constraint],
                });
                self.to_visit.push(connector_id);
            }
        }
    }

    /// Checks if a constraint is satisfied by a solution. Caller must make sure
    /// that a local solution has already been provided. Will return an error
    /// value only if the provided constraint does not make sense (i.e. a
    /// nefarious peer has supplied a constraint with a port we do not own).
    pub(crate) fn check_constraint(&self, connector_id: ConnectorId, constraint: SyncBranchConstraint) -> Result<bool, ()>  {
        debug_assert!(self.has_local_solution_for(connector_id));

        let entry = self.local_solutions
            .iter()
            .find(|v| v.connector_id == connector_id)
            .unwrap();

        match constraint {
            SyncBranchConstraint::SilentPort(silent_port_id) => {
                for (port_id, mapped_id) in &entry.final_port_mapping {
                    if *port_id == silent_port_id {
                        // If silent, then mapped ID is invalid
                        return Ok(!mapped_id.is_valid())
                    }
                }

                return Err(());
            },
            SyncBranchConstraint::BranchNumber(expected_branch_id) => {
                return Ok(entry.execution_branch_ids.contains(&expected_branch_id));
            },
            SyncBranchConstraint::PortMapping(port_id, expected_branch_id) => {
                for (port_id, mapped_id) in &entry.final_port_mapping {
                    if port_id == port_id {
                        return Ok(*mapped_id == expected_branch_id);
                    }
                }

                return Err(());
            },
        }
    }
}

#[derive(Debug, Clone)]
pub struct SolutionMessage {
    pub comparison_number: u64,
    pub connector_origin: ConnectorId,
    pub local_solutions: Vec<(ConnectorId, BranchId)>,
    pub to_visit: Vec<ConnectorId>,
}

/// A control message. These might be sent by the scheduler to notify eachother
/// of asynchronous state changes.
#[derive(Debug, Clone)]
pub struct ControlMessage {
    pub id: u32, // generic identifier, used to match request to response
    pub content: ControlMessageVariant,
}

#[derive(Debug, Clone)]
pub enum ControlMessageVariant {
    ChangePortPeer(PortIdLocal, ConnectorId), // specified port has a new peer, sent to owner of said port
    CloseChannel(PortIdLocal), // close the port associated with this
    Ack, // acknowledgement of previous control message, matching occurs through control message ID.
}

/// Generic message contents.
#[derive(Debug, Clone)]
pub enum MessageContents {
    Data(DataMessage),              // data message, handled by connector
    Sync(SyncMessage),              // sync message, handled by both connector/scheduler
    RequestCommit(SolutionMessage), // solution message, requesting participants to commit
    ConfirmCommit(SolutionMessage), // solution message, confirming a solution everyone committed to
    Control(ControlMessage),        // control message, handled by scheduler
    Ping,                           // ping message, intentionally waking up a connector (used for native connectors)
}

#[derive(Debug)]
pub struct Message {
    pub sending_connector: ConnectorId,
    pub receiving_port: PortIdLocal, // may be invalid (in case of messages targeted at the connector)
    pub contents: MessageContents,
}

/// The public inbox of a connector. The thread running the connector that owns
/// this inbox may retrieved from it. Non-owning threads may only put new
/// messages inside of it.
// TODO: @Optimize, lazy concurrency. Probably ringbuffer with read/write heads.
//  Should behave as a MPSC queue.
pub struct PublicInbox {
    messages: Mutex<VecDeque<MessageFancy>>,
}

impl PublicInbox {
    pub fn new() -> Self {
        Self{
            messages: Mutex::new(VecDeque::new()),
        }
    }

    pub fn insert_message(&self, message: MessageFancy) {
        let mut lock = self.messages.lock().unwrap();
        lock.push_back(message);
    }

    pub fn take_message(&self) -> Option<MessageFancy> {
        let mut lock = self.messages.lock().unwrap();
        return lock.pop_front();
    }

    pub fn is_empty(&self) -> bool {
        let lock = self.messages.lock().unwrap();
        return lock.is_empty();
    }
}