Changeset - 1b179e5f4579
[Not reviewed]
0 4 0
MH - 4 years ago 2021-11-12 13:07:38
contact@maxhenger.nl
Bugfixes in initial API component
4 files changed with 75 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/runtime2/branch.rs
Show inline comments
 
@@ -329,212 +329,212 @@ impl<'a> Iterator for BranchParentIter<'a> {
 
    fn next(&mut self) -> Option<Self::Item> {
 
        if self.index == 0 {
 
            return None;
 
        }
 

	
 
        let branch = &self.branches[self.index];
 
        self.index = branch.parent_id.index as usize;
 
        return Some(branch);
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// FakeTree
 
// -----------------------------------------------------------------------------
 

	
 
/// Generic fake branch. This is supposed to be used in conjunction with the
 
/// fake tree. The purpose is to have a branching-like tree to use in
 
/// combination with a consensus algorithm in places where we don't have PDL
 
/// code.
 
pub(crate) struct FakeBranch {
 
    pub id: BranchId,
 
    pub parent_id: BranchId,
 
    pub sync_state: SpeculativeState,
 
    pub awaiting_port: PortIdLocal,
 
    pub next_in_queue: BranchId,
 
    pub inbox: HashMap<PortIdLocal, ValueGroup>,
 
}
 

	
 
impl BranchListItem for FakeBranch {
 
    #[inline] fn get_id(&self) -> BranchId { return self.id; }
 
    #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; }
 
    #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; }
 
}
 

	
 
impl FakeBranch {
 
    fn new_root(_index: u32) -> FakeBranch {
 
        debug_assert!(_index == 1);
 
        return FakeBranch{
 
            id: BranchId::new(1),
 
            parent_id: BranchId::new_invalid(),
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: PortIdLocal::new_invalid(),
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: HashMap::new(),
 
        }
 
    }
 

	
 
    fn new_branching(index: u32, parent_branch: &FakeBranch) -> FakeBranch {
 
        return FakeBranch {
 
            id: BranchId::new(index),
 
            parent_id: parent_branch.id,
 
            sync_state: SpeculativeState::RunningInSync,
 
            awaiting_port: parent_branch.awaiting_port,
 
            next_in_queue: BranchId::new_invalid(),
 
            inbox: parent_branch.inbox.clone(),
 
        }
 
    }
 

	
 
    pub fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) {
 
        debug_assert!(target_port.is_valid());
 
        debug_assert!(self.awaiting_port == target_port);
 
        self.awaiting_port = PortIdLocal::new_invalid();
 
        self.inbox.insert(target_port, contents);
 
    }
 
}
 

	
 
/// A little helper for native components that don't have a set of branches that
 
/// are actually executing code, but just have to manage the idea of branches
 
/// due to them performing the equivalent of a branching `get` call.
 
pub(crate) struct FakeTree {
 
    pub branches: Vec<FakeBranch>,
 
    queues: [BranchQueue; NUM_QUEUES],
 
}
 

	
 
impl FakeTree {
 
    pub fn new() -> Self {
 
        // TODO: Don't like this? Cause is that now we don't have a non-sync
 
        //  branch. But we assumed BranchId=0 means the branch is invalid. We
 
        //  can do the rusty Option<BranchId> stuff. But we still need a token
 
        //  value within the protocol to signify no-branch-id. Maybe the high
 
        //  bit? Branches are crazy expensive, no-one is going to have 2^32
 
        //  branches anyway. 2^31 isn't too bad.
 
        return Self {
 
            branches: vec![FakeBranch{
 
                id: BranchId::new_invalid(),
 
                parent_id: BranchId::new_invalid(),
 
                sync_state: SpeculativeState::RunningNonSync,
 
                awaiting_port: PortIdLocal::new_invalid(),
 
                next_in_queue: BranchId::new_invalid(),
 
                inbox: HashMap::new(),
 
            }],
 
            queues: [BranchQueue::new(); 3]
 
        }
 
    }
 

	
 
    fn is_in_sync(&self) -> bool {
 
        return !self.branches.is_empty();
 
        return self.branches.len() > 1;
 
    }
 

	
 
    pub fn queue_is_empty(&self, kind: QueueKind) -> bool {
 
        return self.queues[kind.as_index()].is_empty();
 
    }
 

	
 
    pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option<BranchId> {
 
        debug_assert_ne!(kind, QueueKind::FinishedSync);
 
        return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches);
 
    }
 

	
 
    pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) {
 
        push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id);
 
    }
 

	
 
    pub fn get_queue_first(&self, kind: QueueKind) -> Option<BranchId> {
 
        let queue = &self.queues[kind.as_index()];
 
        if queue.first.is_valid() {
 
            return Some(queue.first)
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    pub fn get_queue_next(&self, branch_id: BranchId) -> Option<BranchId> {
 
        let branch = &self.branches[branch_id.index as usize];
 
        if branch.next_in_queue.is_valid() {
 
            return Some(branch.next_in_queue);
 
        } else {
 
            return None;
 
        }
 
    }
 

	
 
    pub fn start_sync(&mut self) -> BranchId {
 
        debug_assert!(!self.is_in_sync());
 

	
 
        // Create the first branch
 
        let sync_branch = FakeBranch::new_root(0);
 
        let sync_branch = FakeBranch::new_root(1);
 
        let sync_branch_id = sync_branch.id;
 
        self.branches.push(sync_branch);
 

	
 
        return sync_branch_id;
 
    }
 

	
 
    pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId {
 
        debug_assert!(self.is_in_sync());
 
        let parent_branch = &self[parent_branch_id];
 
        let new_branch = FakeBranch::new_branching(self.branches.len() as u32, parent_branch);
 
        let new_branch_id = new_branch.id;
 
        self.branches.push(new_branch);
 

	
 
        return new_branch_id;
 
    }
 

	
 
    pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch {
 
        debug_assert!(branch_id.is_valid());
 
        debug_assert!(self.is_in_sync());
 

	
 
        // Take out the succeeding branch, then just clear all fake branches.
 
        self.branches.swap(1, branch_id.index as usize);
 
        self.branches.truncate(2);
 
        let result = self.branches.pop().unwrap();
 

	
 
        for queue_index in 0..NUM_QUEUES {
 
            self.queues[queue_index] = BranchQueue::new();
 
        }
 

	
 
        return result;
 
    }
 
}
 

	
 
impl Index<BranchId> for FakeTree {
 
    type Output = FakeBranch;
 

	
 
    fn index(&self, index: BranchId) -> &Self::Output {
 
        return &self.branches[index.index as usize];
 
    }
 
}
 

	
 
impl IndexMut<BranchId> for FakeTree {
 
    fn index_mut(&mut self, index: BranchId) -> &mut Self::Output {
 
        return &mut self.branches[index.index as usize];
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// Shared logic
 
// -----------------------------------------------------------------------------
 

	
 
fn pop_from_queue<B: BranchListItem>(queue: &mut BranchQueue, branches: &mut [B]) -> Option<BranchId> {
 
    if queue.is_empty() {
 
        return None;
 
    } else {
 
        let first_branch = &mut branches[queue.first.index as usize];
 
        queue.first = first_branch.get_next_id();
 
        first_branch.set_next_id(BranchId::new_invalid());
 
        if !queue.first.is_valid() {
 
            queue.last = BranchId::new_invalid();
 
        }
 

	
 
        return Some(first_branch.get_id());
 
    }
 
}
 

	
 
fn push_into_queue<B: BranchListItem>(queue: &mut BranchQueue, branches: &mut [B], branch_id: BranchId) {
 
    debug_assert!(!branches[branch_id.index as usize].get_next_id().is_valid());
 
    if queue.is_empty() {
 
        queue.first = branch_id;
 
        queue.last = branch_id;
 
    } else {
 
        let last_branch = &mut branches[queue.last.index as usize];
 
        last_branch.set_next_id(branch_id);
 
        queue.last = branch_id;
 
    }
 
}
 
\ No newline at end of file
src/runtime2/native.rs
Show inline comments
 
@@ -4,531 +4,546 @@ use std::sync::atomic::Ordering;
 
use std::collections::HashMap;
 

	
 
use crate::protocol::ComponentCreationError;
 
use crate::protocol::eval::ValueGroup;
 

	
 
use super::{ConnectorKey, ConnectorId, RuntimeInner};
 
use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState};
 
use super::scheduler::{SchedulerCtx, ComponentCtx};
 
use super::port::{Port, PortIdLocal, Channel, PortKind};
 
use super::consensus::{Consensus, Consistency, find_ports_in_value_group};
 
use super::connector::{ConnectorScheduling, ConnectorPDL};
 
use super::inbox::{Message, DataContent, DataMessage, SyncMessage, ControlContent, ControlMessage};
 

	
 
/// Generic connector interface from the scheduler's point of view.
 
pub(crate) trait Connector {
 
    /// Should run the connector's behaviour up until the next blocking point.
 
    /// One should generally request and handle new messages from the component
 
    /// context. Then perform any logic the component has to do, and in the
 
    /// process perhaps queue up some state changes using the same context.
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling;
 
}
 

	
 
pub(crate) struct FinishedSync {
 
    // In the order of the `get` calls
 
    inbox: Vec<ValueGroup>,
 
}
 

	
 
type SyncDone = Arc<(Mutex<Option<FinishedSync>>, Condvar)>;
 
type JobQueue = Arc<Mutex<VecDeque<ApplicationJob>>>;
 

	
 
enum ApplicationJob {
 
    NewChannel((Port, Port)),
 
    NewConnector(ConnectorPDL, Vec<PortIdLocal>),
 
    SyncRound(Vec<ApplicationSyncAction>),
 
    Shutdown,
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ConnectorApplication
 
// -----------------------------------------------------------------------------
 

	
 
/// The connector which an application can directly interface with. Once may set
 
/// up the next synchronous round, and retrieve the data afterwards.
 
// TODO: Strong candidate for logic reduction in handling put/get. A lot of code
 
//  is an approximate copy-pasta from the regular component logic. I'm going to
 
//  wait until I'm implementing more native components to see which logic is
 
//  truly common.
 
pub struct ConnectorApplication {
 
    // Communicating about new jobs and setting up sync rounds
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    is_in_sync: bool,
 
    // Handling current sync round
 
    sync_desc: Vec<ApplicationSyncAction>,
 
    tree: FakeTree,
 
    consensus: Consensus,
 
    last_finished_handled: Option<BranchId>,
 
    branch_extra: Vec<usize>, // instruction counter per branch
 
}
 

	
 
impl Connector for ConnectorApplication {
 
    fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        if self.is_in_sync {
 
            let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx);
 
            let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync));
 
            while let Some(branch_id) = iter_id {
 
                iter_id = self.tree.get_queue_next(branch_id);
 
                self.last_finished_handled = Some(branch_id);
 

	
 
                if let Some(solution_branch) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) {
 
                    // Can finish sync round immediately
 
                    self.collapse_sync_to_solution_branch(solution_branch, comp_ctx);
 
                    return ConnectorScheduling::Immediate;
 
                }
 
            }
 

	
 
            return scheduling;
 
        } else {
 
            return self.run_in_deterministic_mode(sched_ctx, comp_ctx);
 
        }
 
    }
 
}
 

	
 
impl ConnectorApplication {
 
    pub(crate) fn new(runtime: Arc<RuntimeInner>) -> (Self, ApplicationInterface) {
 
        let sync_done = Arc::new(( Mutex::new(None), Condvar::new() ));
 
        let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32)));
 

	
 
        let connector = ConnectorApplication {
 
            sync_done: sync_done.clone(),
 
            job_queue: job_queue.clone(),
 
            is_in_sync: false,
 
            sync_desc: Vec::new(),
 
            tree: FakeTree::new(),
 
            consensus: Consensus::new(),
 
            last_finished_handled: None,
 
            branch_extra: Vec::new(),
 
            branch_extra: vec![0],
 
        };
 
        let interface = ApplicationInterface::new(sync_done, job_queue, runtime);
 

	
 
        return (connector, interface);
 
    }
 

	
 
    fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) {
 
        while let Some(message) = comp_ctx.read_next_message() {
 
            match message {
 
                Message::Data(_) => todo!("data message in API connector"),
 
                Message::Sync(_)  => todo!("sync message in API connector"),
 
                Message::Control(_) => todo!("impossible control message"),
 
                Message::Data(message) => self.handle_new_data_message(message, comp_ctx),
 
                Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx),
 
                Message::Control(_) => unreachable!("control message in native API component"),
 
            }
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) {
 
        // Go through all branches that are awaiting new messages and see if
 
        // there is one that can receive this message.
 
        if !self.consensus.handle_new_data_message(&message, ctx) {
 
            // Old message, so drop it
 
            return;
 
        }
 

	
 
        let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage);
 
        while let Some(branch_id) = iter_id {
 
            iter_id = self.tree.get_queue_next(branch_id);
 

	
 
            let branch = &self.tree[branch_id];
 
            if branch.awaiting_port != message.data_header.target_port { continue; }
 
            if !self.consensus.branch_can_receive(branch_id, &message) { continue; }
 

	
 
            // This branch can receive, so fork and given it the message
 
            let receiving_branch_id = self.tree.fork_branch(branch_id);
 
            debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
            self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index
 
            self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
            let receiving_branch = &mut self.tree[receiving_branch_id];
 

	
 
            receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone());
 
            self.consensus.notify_of_received_message(receiving_branch_id, &message);
 

	
 
            // And prepare the branch for running
 
            self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 
        }
 
    }
 

	
 
    pub(crate) fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) {
 
        if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) {
 
            self.collapse_sync_to_solution_branch(solution_branch_id, ctx);
 
        }
 
    }
 

	
 
    fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(self.is_in_sync);
 

	
 
        self.handle_new_messages(comp_ctx);
 

	
 
        let branch_id = self.tree.pop_from_queue(QueueKind::Runnable);
 
        if branch_id.is_none() {
 
            return ConnectorScheduling::NotNow;
 
        }
 

	
 
        let branch_id = branch_id.unwrap();
 
        let branch = &mut self.tree[branch_id];
 
        let mut instruction_idx = self.branch_extra[branch_id.index as usize];
 

	
 
        if instruction_idx >= self.sync_desc.len() {
 
            // Performed last instruction, so this branch is officially at the
 
            // end of the synchronous interaction.
 
            let consistency = self.consensus.notify_of_finished_branch(branch_id);
 
            if consistency == Consistency::Valid {
 
                branch.sync_state = SpeculativeState::ReachedSyncEnd;
 
                self.tree.push_into_queue(QueueKind::FinishedSync, branch_id);
 
            } else {
 
                branch.sync_state = SpeculativeState::Inconsistent;
 
            }
 
        } else {
 
            // We still have instructions to perform
 
            let cur_instruction = &self.sync_desc[instruction_idx];
 
            self.branch_extra[branch_id.index as usize] += 1;
 

	
 
            match &cur_instruction {
 
                ApplicationSyncAction::Put(port_id, content) => {
 
                    let port_id = *port_id;
 
                    let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                    if consistency == Consistency::Valid {
 
                        let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx);
 
                        let message = Message::Data(DataMessage {
 
                            sync_header,
 
                            data_header,
 
                            content: DataContent::Message(content.clone()),
 
                        });
 
                        comp_ctx.submit_message(message);
 
                        self.tree.push_into_queue(QueueKind::Runnable, branch_id);
 
                        return ConnectorScheduling::Immediate;
 
                    } else {
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 
                },
 
                ApplicationSyncAction::Get(port_id) => {
 
                    let port_id = *port_id;
 
                    let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true);
 
                    if consistency == Consistency::Valid {
 
                        branch.sync_state = SpeculativeState::HaltedAtBranchPoint;
 
                        branch.awaiting_port = port_id;
 
                        self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id);
 

	
 
                        let mut any_message_received = false;
 
                        for message in comp_ctx.get_read_data_messages(port_id) {
 
                            if self.consensus.branch_can_receive(branch_id, &message) {
 
                                // This branch can receive the message, so we do the
 
                                // fork-and-receive dance
 
                                let receiving_branch_id = self.tree.fork_branch(branch_id);
 
                                let branch = &mut self.tree[receiving_branch_id];
 
                                debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len());
 
                                self.branch_extra.push(instruction_idx + 1);
 

	
 
                                branch.insert_message(port_id, message.content.as_message().unwrap().clone());
 

	
 
                                self.consensus.notify_of_new_branch(branch_id, receiving_branch_id);
 
                                self.consensus.notify_of_received_message(receiving_branch_id, &message);
 
                                self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id);
 

	
 
                                any_message_received = true;
 
                            }
 
                        }
 

	
 
                        if any_message_received {
 
                            return ConnectorScheduling::Immediate;
 
                        }
 
                    } else {
 
                        branch.sync_state = SpeculativeState::Inconsistent;
 
                    }
 
                }
 
            }
 
        }
 

	
 
        if self.tree.queue_is_empty(QueueKind::Runnable) {
 
            return ConnectorScheduling::NotNow;
 
        } else {
 
            return ConnectorScheduling::Later;
 
        }
 
    }
 

	
 
    fn run_in_deterministic_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {
 
        debug_assert!(!self.is_in_sync);
 

	
 
        // In non-sync mode the application component doesn't really do anything
 
        // except performing jobs submitted from the API. This is the only
 
        // case where we expect to be woken up.
 
        // Note that we have to communicate to the scheduler when we've received
 
        // ports or created components (hence: given away ports) *before* we
 
        // enter a sync round.
 
        let mut queue = self.job_queue.lock().unwrap();
 
        while let Some(job) = queue.pop_front() {
 
            match job {
 
                ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => {
 
                    comp_ctx.push_port(endpoint_a);
 
                    comp_ctx.push_port(endpoint_b);
 

	
 
                    return ConnectorScheduling::Immediate;
 
                }
 
                ApplicationJob::NewConnector(connector, initial_ports) => {
 
                    comp_ctx.push_component(connector, initial_ports);
 

	
 
                    return ConnectorScheduling::Later;
 
                },
 
                ApplicationJob::SyncRound(mut description) => {
 
                    // Entering sync mode
 
                    comp_ctx.notify_sync_start();
 
                    self.sync_desc = description;
 
                    self.is_in_sync = true;
 
                    debug_assert!(self.last_finished_handled.is_none());
 
                    debug_assert!(self.branch_extra.is_empty());
 
                    debug_assert!(self.branch_extra.len() == 1);
 

	
 
                    let first_branch_id = self.tree.start_sync();
 
                    self.tree.push_into_queue(QueueKind::Runnable, first_branch_id);
 
                    debug_assert!(first_branch_id.index == 1);
 
                    self.consensus.start_sync(comp_ctx);
 
                    self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id);
 
                    self.branch_extra.push(0); // set first branch to first instruction
 

	
 
                    return ConnectorScheduling::Immediate;
 
                },
 
                ApplicationJob::Shutdown => {
 
                    debug_assert!(queue.is_empty());
 

	
 
                    return ConnectorScheduling::Exit;
 
                }
 
            }
 
        }
 

	
 
        // Queue was empty
 
        return ConnectorScheduling::NotNow;
 
    }
 

	
 
    fn collapse_sync_to_solution_branch(&mut self, branch_id: BranchId, comp_ctx: &mut ComponentCtx) {
 
        debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program
 
        // Notifying tree, consensus algorithm and context of ending sync
 
        let mut fake_vec = Vec::new();
 
        let mut solution_branch = self.tree.end_sync(branch_id);
 
        self.consensus.end_sync(branch_id, &mut fake_vec);
 

	
 
        for port in fake_vec {
 
            debug_assert!(comp_ctx.get_port_by_id(port).is_some());
 
        }
 

	
 
        comp_ctx.notify_sync_end(&[]);
 

	
 
        // Turning hashmapped inbox into vector of values
 
        let mut inbox = Vec::with_capacity(solution_branch.inbox.len());
 
        for action in &self.sync_desc {
 
            match action {
 
                ApplicationSyncAction::Put(_, _) => {},
 
                ApplicationSyncAction::Get(port_id) => {
 
                    debug_assert!(solution_branch.inbox.contains_key(port_id));
 
                    inbox.push(solution_branch.inbox.remove(port_id).unwrap());
 
                },
 
            }
 
        }
 

	
 
        // Notifying interface of ending sync
 
        self.is_in_sync = false;
 
        self.sync_desc.clear();
 
        self.branch_extra.clear();
 
        self.branch_extra.truncate(1);
 
        self.last_finished_handled = None;
 

	
 
        let (results, notification) = &*self.sync_done;
 
        let mut results = results.lock().unwrap();
 
        *results = Some(FinishedSync{ inbox });
 
        notification.notify_one();
 
    }
 
}
 

	
 
// -----------------------------------------------------------------------------
 
// ApplicationInterface
 
// -----------------------------------------------------------------------------
 

	
 
#[derive(Debug)]
 
pub enum ChannelCreationError {
 
    InSync,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationStartSyncError {
 
    AlreadyInSync,
 
    NoSyncActions,
 
    IncorrectPortKind,
 
    UnownedPort,
 
}
 

	
 
#[derive(Debug)]
 
pub enum ApplicationEndSyncError {
 
    NotInSync,
 
}
 

	
 
pub enum ApplicationSyncAction {
 
    Put(PortIdLocal, ValueGroup),
 
    Get(PortIdLocal),
 
}
 

	
 
/// The interface to a `ApplicationConnector`. This allows setting up the
 
/// interactions the `ApplicationConnector` performs within a synchronous round.
 
pub struct ApplicationInterface {
 
    sync_done: SyncDone,
 
    job_queue: JobQueue,
 
    runtime: Arc<RuntimeInner>,
 
    is_in_sync: bool,
 
    connector_id: ConnectorId,
 
    owned_ports: Vec<(PortKind, PortIdLocal)>,
 
}
 

	
 
impl ApplicationInterface {
 
    fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc<RuntimeInner>) -> Self {
 
        return Self{
 
            sync_done, job_queue, runtime,
 
            is_in_sync: false,
 
            connector_id: ConnectorId::new_invalid(),
 
            owned_ports: Vec::new(),
 
        }
 
    }
 

	
 
    /// Creates a new channel. Can only fail if the application interface is
 
    /// currently in sync mode.
 
    pub fn create_channel(&mut self) -> Result<Channel, ChannelCreationError> {
 
        if self.is_in_sync {
 
            return Err(ChannelCreationError::InSync);
 
        }
 

	
 
        let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id);
 
        debug_assert_eq!(getter_port.kind, PortKind::Getter);
 
        let getter_id = getter_port.self_id;
 
        let putter_id = putter_port.self_id;
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port)));
 
        }
 

	
 
        // Add to owned ports for error checking while creating a connector
 
        self.owned_ports.reserve(2);
 
        self.owned_ports.push((PortKind::Putter, putter_id));
 
        self.owned_ports.push((PortKind::Getter, getter_id));
 

	
 
        return Ok(Channel{ putter_id, getter_id });
 
    }
 

	
 
    /// Creates a new connector. Note that it is not scheduled immediately, but
 
    /// depends on the `ApplicationConnector` to run, followed by the created
 
    /// connector being scheduled.
 
    // TODO: Yank out scheduler logic for common use.
 
    pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> {
 
        if self.is_in_sync {
 
            return Err(ComponentCreationError::InSync);
 
        }
 

	
 
        // Retrieve ports and make sure that we own the ones that are currently
 
        // specified. This is also checked by the scheduler, but that is done
 
        // asynchronously.
 
        let mut initial_ports = Vec::new();
 
        find_ports_in_value_group(&arguments, &mut initial_ports);
 
        for initial_port in &initial_ports {
 
            if !self.owned_ports.iter().any(|(_, v)| v == initial_port) {
 
                return Err(ComponentCreationError::UnownedPort);
 
            }
 
        }
 

	
 
        // We own all ports, so remove them on this side
 
        for initial_port in &initial_ports {
 
            let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap();
 
            self.owned_ports.remove(position);
 
        }
 

	
 
        let state = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?;
 
        let connector = ConnectorPDL::new(state);
 

	
 
        // Put on job queue
 
        {
 
            let mut queue = self.job_queue.lock().unwrap();
 
            queue.push_back(ApplicationJob::NewConnector(connector, initial_ports));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 

	
 
        return Ok(());
 
    }
 

	
 
    /// Queues up a description of a synchronous round to run. Will not actually
 
    /// run the synchronous behaviour in blocking fashion. The results *must* be
 
    /// retrieved using `try_wait` or `wait` for the interface to be considered
 
    /// in non-sync mode.
 
    pub fn perform_sync_round(&mut self, actions: Vec<ApplicationSyncAction>) -> Result<(), ApplicationStartSyncError> {
 
        if self.is_in_sync {
 
            return Err(ApplicationStartSyncError::AlreadyInSync);
 
        }
 

	
 
        // Check the action ports for consistency
 
        for action in &actions {
 
            let (port_id, expected_kind) = match action {
 
                ApplicationSyncAction::Put(port_id, _) => (*port_id, PortKind::Putter),
 
                ApplicationSyncAction::Get(port_id) => (*port_id, PortKind::Getter),
 
            };
 

	
 
            match self.find_port_by_id(port_id) {
 
                Some(port_kind) => {
 
                    if port_kind != expected_kind {
 
                        return Err(ApplicationStartSyncError::IncorrectPortKind)
 
                    }
 
                },
 
                None => {
 
                    return Err(ApplicationStartSyncError::UnownedPort);
 
                }
 
            }
 
        }
 

	
 
        // Everything is consistent, go into sync mode and send the actions off
 
        // to the component that will actually perform the sync round
 
        self.is_in_sync = true;
 
        {
 
            let (is_done, _) = &*self.sync_done;
 
            let mut lock = is_done.lock().unwrap();
 
            *lock = None;
 
        }
 

	
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::SyncRound(actions));
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
        return Ok(())
 
    }
 

	
 
    /// Wait until the next sync-round is finished
 
    pub fn wait(&self) -> Result<Vec<ValueGroup>, ApplicationEndSyncError> {
 
    pub fn wait(&mut self) -> Result<Vec<ValueGroup>, ApplicationEndSyncError> {
 
        if !self.is_in_sync {
 
            return Err(ApplicationEndSyncError::NotInSync);
 
        }
 

	
 
        let (is_done, condition) = &*self.sync_done;
 
        let mut lock = is_done.lock().unwrap();
 
        lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done
 

	
 
        self.is_in_sync = false;
 
        return Ok(lock.take().unwrap().inbox);
 
    }
 

	
 
    /// Called by runtime to set associated connector's ID.
 
    pub(crate) fn set_connector_id(&mut self, id: ConnectorId) {
 
        self.connector_id = id;
 
    }
 

	
 
    fn wake_up_connector_with_ping(&self) {
 
        let connector = self.runtime.get_component_public(self.connector_id);
 
        connector.inbox.insert_message(Message::Control(ControlMessage {
 
            id: 0,
 
            sending_component_id: self.connector_id,
 
            content: ControlContent::Ping,
 
        }));
 

	
 
        let should_wake_up = connector.sleeping
 
            .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire)
 
            .is_ok();
 

	
 
        if should_wake_up {
 
            let key = unsafe{ ConnectorKey::from_id(self.connector_id) };
 
            self.runtime.push_work(key);
 
        }
 
    }
 

	
 
    fn find_port_by_id(&self, port_id: PortIdLocal) -> Option<PortKind> {
 
        return self.owned_ports.iter()
 
            .find(|(_, owned_id)| *owned_id == port_id)
 
            .map(|(port_kind, _)| *port_kind);
 
    }
 
}
 

	
 
impl Drop for ApplicationInterface {
 
    fn drop(&mut self) {
 
        {
 
            let mut lock = self.job_queue.lock().unwrap();
 
            lock.push_back(ApplicationJob::Shutdown);
 
        }
 

	
 
        self.wake_up_connector_with_ping();
 
        self.runtime.decrement_active_interfaces();
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/api_component.rs
Show inline comments
 
// Testing the api component.
 
//
 
// These tests explicitly do not use the "NUM_INSTANCES" constant because we're
 
// doing some communication with the native component. Hence only expect one
 

	
 
use super::*;
 

	
 
#[test]
 
fn test_put_and_get() {
 
    const CODE: &'static str = "
 
    primitive handler(in<u32> request, out<u32> response, u32 loops) {
 
        u32 index = 0;
 
        while (index < loops) {
 
            synchronous {
 
                auto value = get(request);
 
                put(response, value * 2);
 
            }
 
            index += 1;
 
        }
 
    }
 
    ";
 

	
 
    let pd = ProtocolDescription::parse(CODE.as_bytes()).unwrap();
 
    let rt = Runtime::new(NUM_THREADS, pd);
 
    let mut api = rt.create_interface();
 

	
 
    let req_chan = api.create_channel().unwrap();
 
    let resp_chan = api.create_channel().unwrap();
 

	
 
    api.create_connector("", "handler", ValueGroup::new_stack(vec![
 
        Value::Input(PortId::new(req_chan.getter_id.index)),
 
        Value::Output(PortId::new(resp_chan.putter_id.index)),
 
        Value::UInt32(NUM_LOOPS),
 
    ])).unwrap();
 

	
 
    for loop_idx in 0..NUM_LOOPS {
 
        api.perform_sync_round(vec![
 
            ApplicationSyncAction::Put(req_chan.putter_id, ValueGroup::new_stack(vec![Value::UInt32(loop_idx)])),
 
            ApplicationSyncAction::Get(resp_chan.getter_id)
 
        ]).expect("start sync round");
 

	
 
        let result = api.wait().expect("finish sync round");
 
        assert!(result.len() == 1);
 
        if let Value::UInt32(gotten) = result[0].values[0] {
 
            assert_eq!(gotten, loop_idx * 2);
 
        } else {
 
            assert!(false);
 
        }
 
    }
 
}
 
\ No newline at end of file
src/runtime2/tests/mod.rs
Show inline comments
 
mod network_shapes;
 
mod api_component;
 

	
 
use super::*;
 
use crate::{PortId, ProtocolDescription};
 
use crate::common::Id;
 
use crate::protocol::eval::*;
 
use crate::runtime2::native::{ApplicationSyncAction};
 

	
 
//
 

	
 
// Generic testing constants, use when appropriate to simplify stress-testing
 
pub(crate) const NUM_THREADS: u32 = 3;     // number of threads in runtime
 
pub(crate) const NUM_INSTANCES: u32 = 5;   // number of test instances constructed
 
pub(crate) const NUM_LOOPS: u32 = 5;       // number of loops within a single test (not used by all tests)
 

	
 
fn create_runtime(pdl: &str) -> Runtime {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    return runtime;
 
}
 

	
 
fn run_test_in_runtime<F: Fn(&mut ApplicationInterface)>(pdl: &str, constructor: F) {
 
    let protocol = ProtocolDescription::parse(pdl.as_bytes())
 
        .expect("parse PDL");
 
    let runtime = Runtime::new(NUM_THREADS, protocol);
 

	
 
    let mut api = runtime.create_interface();
 
    for _ in 0..NUM_INSTANCES {
 
        constructor(&mut api);
 
    }
 

	
 
    // Wait until done :)
 
}
 

	
 
pub(crate) struct TestTimer {
 
    name: &'static str,
 
    started: std::time::Instant
 
}
 

	
 
impl TestTimer {
 
    pub(crate) fn new(name: &'static str) -> Self {
 
        Self{ name, started: std::time::Instant::now() }
 
    }
 
}
 

	
 
impl Drop for TestTimer {
 
    fn drop(&mut self) {
 
        let delta = std::time::Instant::now() - self.started;
 
        let nanos = (delta.as_secs_f64() * 1_000_000.0) as u64;
 
        let millis = nanos / 1000;
 
        let nanos = nanos % 1000;
 
        println!("[{}] Took {:>4}.{:03} ms", self.name, millis, nanos);
 
    }
 
}
0 comments (0 inline, 0 general)