From 2d2a4eb3c08edc03163e7be8aae67bf411b1fa1f 2021-11-11 18:46:33 From: MH Date: 2021-11-11 18:46:33 Subject: [PATCH] Initial version of API put/get --- diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 62481340e1b64cfaf42180b4dc1041409f4d733b..c806614529f4e902549e03aff14271826afe2623 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -52,6 +52,7 @@ pub enum ComponentCreationError { InvalidNumArguments, InvalidArgumentType(usize), UnownedPort, + InSync, } impl std::fmt::Debug for ProtocolDescription { diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 1b296960bcfabfc69b0176b84cd9b13b012e6bd3..465d52f779922ffc6ca94a0c13980c9317ef4f95 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -6,6 +6,13 @@ use crate::protocol::eval::{Value, ValueGroup}; use super::port::PortIdLocal; +// To share some logic between the FakeTree and ExecTree implementation +trait BranchListItem { + #[inline] fn get_id(&self) -> BranchId; + #[inline] fn set_next_id(&mut self, id: BranchId); + #[inline] fn get_next_id(&self) -> BranchId; +} + /// Generic branch ID. A component will always have one branch: the /// non-speculative branch. This branch has ID 0. Hence in a speculative context /// we use this fact to let branch ID 0 denote the ID being invalid. @@ -60,6 +67,12 @@ pub(crate) struct Branch { pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove? } +impl BranchListItem for Branch { + #[inline] fn get_id(&self) -> BranchId { return self.id; } + #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } + #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } +} + impl Branch { /// Creates a new non-speculative branch pub(crate) fn new_non_sync(component_state: ComponentState) -> Self { @@ -148,6 +161,10 @@ impl QueueKind { } } +// ----------------------------------------------------------------------------- +// ExecTree +// ----------------------------------------------------------------------------- + /// Execution tree of branches. Tries to keep the extra information stored /// herein to a minimum. So the execution tree is aware of the branches, their /// execution state and the way they're dependent on each other, but the @@ -188,32 +205,12 @@ impl ExecTree { /// Pops a branch (ID) from a queue. pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { debug_assert_ne!(kind, QueueKind::FinishedSync); // for purposes of logic we expect the queue to grow during a sync round - let queue = &mut self.queues[kind.as_index()]; - if queue.is_empty() { - return None; - } else { - let first_branch = &mut self.branches[queue.first.index as usize]; - queue.first = first_branch.next_in_queue; - first_branch.next_in_queue = BranchId::new_invalid(); - if !queue.first.is_valid() { - queue.last = BranchId::new_invalid(); - } - - return Some(first_branch.id); - } + return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); } /// Pushes a branch (ID) into a queue. pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { - let queue = &mut self.queues[kind.as_index()]; - if queue.is_empty() { - queue.first = id; - queue.last = id; - } else { - let last_branch = &mut self.branches[queue.last.index as usize]; - last_branch.next_in_queue = id; - queue.last = id; - } + push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); } /// Returns the non-sync branch (TODO: better name?) @@ -225,25 +222,13 @@ impl ExecTree { /// Returns an iterator over all the elements in the queue of the given /// kind. One can start the iteration at the branch *after* the provided /// branch. Just make sure it actually is in the provided queue. - pub fn iter_queue(&self, kind: QueueKind, start_at: Option) -> BranchQueueIter { + pub fn iter_queue(&self, kind: QueueKind, start_after: Option) -> BranchQueueIter<'_, Branch> { + // Make sure branch is in correct queue while in debug mode + debug_assert!(start_after + .map(|branch_id| self.iter_queue(kind, None).any(|v| v.id == branch_id)) + .unwrap_or(true)); let queue = &self.queues[kind.as_index()]; - - let index = match start_at { - Some(branch_id) => { - debug_assert!(self.iter_queue(kind, None).any(|v| v.id == branch_id)); - let branch = &self.branches[branch_id.index as usize]; - - branch.next_in_queue.index as usize - }, - None => { - queue.first.index as usize - } - }; - - return BranchQueueIter { - branches: self.branches.as_slice(), - index, - } + return iter_queue(queue, &self.branches, start_after); } /// Returns an iterator that starts with the provided branch, and then @@ -324,13 +309,14 @@ impl IndexMut for ExecTree { } } -pub(crate) struct BranchQueueIter<'a> { - branches: &'a [Branch], +/// Iterator over branches in a `ExecTree` queue. +pub(crate) struct BranchQueueIter<'a, B: BranchListItem> { + branches: &'a [B], index: usize, } -impl<'a> Iterator for BranchQueueIter<'a> { - type Item = &'a Branch; +impl<'a, B: BranchListItem> Iterator for BranchQueueIter<'a, B> { + type Item = &'a B; fn next(&mut self) -> Option { if self.index == 0 { @@ -339,11 +325,12 @@ impl<'a> Iterator for BranchQueueIter<'a> { } let branch = &self.branches[self.index]; - self.index = branch.next_in_queue.index as usize; + self.index = branch.get_next_id().index as usize; return Some(branch); } } +/// Iterator over the parents of an `ExecTree` branch. pub(crate) struct BranchParentIter<'a> { branches: &'a [Branch], index: usize, @@ -361,4 +348,197 @@ impl<'a> Iterator for BranchParentIter<'a> { self.index = branch.parent_id.index as usize; return Some(branch); } +} + +// ----------------------------------------------------------------------------- +// FakeTree +// ----------------------------------------------------------------------------- + +/// Generic fake branch. This is supposed to be used in conjunction with the +/// fake tree. The purpose is to have a branching-like tree to use in +/// combination with a consensus algorithm in places where we don't have PDL +/// code. +pub(crate) struct FakeBranch { + pub id: BranchId, + pub parent_id: BranchId, + pub sync_state: SpeculativeState, + pub awaiting_port: PortIdLocal, + pub next_in_queue: BranchId, + pub inbox: HashMap, +} + +impl BranchListItem for FakeBranch { + #[inline] fn get_id(&self) -> BranchId { return self.id; } + #[inline] fn set_next_id(&mut self, id: BranchId) { self.next_in_queue = id; } + #[inline] fn get_next_id(&self) -> BranchId { return self.next_in_queue; } +} + +impl FakeBranch { + fn new_root(index: u32) -> FakeBranch { + debug_assert!(index == 0); + return FakeBranch{ + id: BranchId::new_invalid(), + parent_id: BranchId::new_invalid(), + sync_state: SpeculativeState::RunningInSync, + awaiting_port: PortIdLocal::new_invalid(), + next_in_queue: BranchId::new_invalid(), + inbox: HashMap::new(), + } + } + + fn new_branching(index: u32, parent_branch: &FakeBranch) -> FakeBranch { + return FakeBranch { + id: BranchId::new(index), + parent_id: parent_branch.id, + sync_state: SpeculativeState::RunningInSync, + awaiting_port: parent_branch.awaiting_port, + next_in_queue: BranchId::new_invalid(), + inbox: parent_branch.inbox.clone(), + } + } + + pub fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) { + debug_assert!(target_port.is_valid()); + debug_assert!(self.awaiting_port == target_port); + self.awaiting_port = PortIdLocal::new_invalid(); + self.inbox.insert(target_port, contents); + } +} + +/// A little helper for native components that don't have a set of branches that +/// are actually executing code, but just have to manage the idea of branches +/// due to them performing the equivalent of a branching `get` call. +pub(crate) struct FakeTree { + pub branches: Vec, + queues: [BranchQueue; NUM_QUEUES], +} + +impl FakeTree { + pub fn new() -> Self { + return Self { + branches: Vec::new(), + queues: [BranchQueue::new(); 3] + } + } + + fn is_in_sync(&self) -> bool { + return !self.branches.is_empty(); + } + + pub fn queue_is_empty(&self, kind: QueueKind) -> bool { + return self.queues[kind.as_index()].is_empty(); + } + + pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { + debug_assert_ne!(kind, QueueKind::FinishedSync); + return pop_from_queue(&mut self.queues[kind.as_index()], &mut self.branches); + } + + pub fn push_into_queue(&mut self, kind: QueueKind, id: BranchId) { + push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); + } + + pub fn iter_queue(&self, kind: QueueKind, start_after: Option) -> BranchQueueIter<'_, FakeBranch> { + debug_assert!(start_after + .map(|branch_id| self.iter_queue(kind, None).any(|v| v.id == branch_id)) + .unwrap_or(true) + ); + return iter_queue(&self.queues[kind.as_index()], &self.branches, start_after); + } + + pub fn start_sync(&mut self) -> BranchId { + debug_assert!(!self.is_in_sync()); + + // Create the first branch + let sync_branch = FakeBranch::new_root(0); + let sync_branch_id = sync_branch.id; + self.branches.push(sync_branch); + + return sync_branch_id; + } + + pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId { + debug_assert!(self.is_in_sync()); + let parent_branch = &self[parent_branch_id]; + let new_branch = FakeBranch::new_branching(self.branches.len() as u32, parent_branch); + let new_branch_id = new_branch.id; + self.branches.push(new_branch); + + return new_branch_id; + } + + pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch { + debug_assert!(self.is_in_sync()); + debug_assert!(self.iter_queue(QueueKind::FinishedSync, None).any(|v| v.id == BranchId)); + + // Take out the succeeding branch, then just clear all fake branches. + let mut iter = self.branches.drain(branch_id.index..); + let result = iter.next().unwrap(); + + for queue_index in 0..NUM_QUEUES { + self.queues[queue_index] = BranchQueue::new(); + } + + return result; + } +} + +impl Index for FakeTree { + type Output = FakeBranch; + + fn index(&self, index: BranchId) -> &Self::Output { + return &self.branches[index.index as usize]; + } +} + +impl IndexMut for FakeTree { + fn index_mut(&mut self, index: BranchId) -> &mut Self::Output { + return &mut self.branches[index.index as usize]; + } +} + +// ----------------------------------------------------------------------------- +// Shared logic +// ----------------------------------------------------------------------------- + +fn pop_from_queue(queue: &mut BranchQueue, branches: &mut [B]) -> Option { + if queue.is_empty() { + return None; + } else { + let first_branch = &mut branches[queue.first.index as usize]; + queue.first = first_branch.get_next_id(); + first_branch.set_next_id(BranchId::new_invalid()); + if !queue.first.is_valid() { + queue.last = BranchId::new_invalid(); + } + + return Some(first_branch.get_id()); + } +} + +fn push_into_queue(queue: &mut BranchQueue, branches: &mut [B], branch_id: BranchId) { + debug_assert!(!branches[branch_id.index as usize].get_next_id().is_valid()); + if queue.is_empty() { + queue.first = branch_id; + queue.last = branch_id; + } else { + let last_branch = &mut branches[queue.last as usize]; + last_branch.set_next_id(branch_id); + queue.last = branch_id; + } +} + +fn iter_queue<'a, B: BranchListItem>(queue: &BranchQueue, branches: &'a [B], start_after: Option) -> BranchQueueIter<'a, B> { + let index = match start_after { + Some(branch_id) => { + // Assuming caller is correct and that the branch is in the queue + let first_branch = &branches[branch_id.index as usize]; + first_branch.get_next_id().index as usize + }, + None => { + queue.first.index as usize + } + }; + + return BranchQueueIter{ branches, index }; } \ No newline at end of file diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 943e149661563948bd38169e4e07b8c0ddd74a11..bdda7503cfdd8efaac4edfa57a0e2365ff67c1cc 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -239,7 +239,7 @@ impl ConnectorPDL { // Note: we only know that a branch is waiting on a message when // it reaches the `get` call. But we might have already received // a message that targets this branch, so check now. - let mut any_branch_received = false; + let mut any_message_received = false; for message in comp_ctx.get_read_data_messages(port_id) { if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { // This branch can receive the message, so we do the @@ -253,11 +253,11 @@ impl ConnectorPDL { self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - any_branch_received = true; + any_message_received = true; } } - if any_branch_received { + if any_message_received { return ConnectorScheduling::Immediate; } } else { @@ -269,7 +269,7 @@ impl ConnectorPDL { if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); - } else if consistency == Consistency::Inconsistent { + } else { branch.sync_state = SpeculativeState::Inconsistent; } }, diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index c965d64b803233458efe6ae822988749577f809f..f69f1392977a41b4413618fefe4e1b13441230ea 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,6 +1,7 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; +use crate::runtime2::branch::BranchQueueIter; use super::branch::{BranchId, ExecTree, QueueKind}; use super::ConnectorId; @@ -331,7 +332,7 @@ impl Consensus { /// `branch_can_receive` function. /// 2. We return the branches that *can* receive the message, you still /// have to explicitly call `notify_of_received_message`. - pub fn handle_new_data_message(&mut self, exec_tree: &ExecTree, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) -> bool { + pub fn handle_new_data_message(&mut self, potential_receivers: BranchQueueIter<'_, >, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) -> bool { self.handle_received_data_header(exec_tree, &message.sync_header, &message.data_header, &message.content, target_ids); return self.handle_received_sync_header(&message.sync_header, ctx) } diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 73ac568570c9bbfd328c4b774335232f1de83c09..10a1ec47a20a917d4a33df1692de024d68190edc 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -4,6 +4,9 @@ use std::sync::atomic::Ordering; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; +use crate::runtime2::branch::{FakeTree, QueueKind, SpeculativeState}; +use crate::runtime2::consensus::{Consensus, Consistency}; +use crate::runtime2::inbox::{DataContent, DataMessage, SyncMessage}; use super::{ConnectorKey, ConnectorId, RuntimeInner}; use super::scheduler::{SchedulerCtx, ComponentCtx}; @@ -27,14 +30,38 @@ type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), NewConnector(ConnectorPDL, Vec), + SyncRound(Vec), Shutdown, } +// ----------------------------------------------------------------------------- +// ConnectorApplication +// ----------------------------------------------------------------------------- + /// The connector which an application can directly interface with. Once may set /// up the next synchronous round, and retrieve the data afterwards. +// TODO: Strong candidate for logic reduction in handling put/get. A lot of code +// is an approximate copy-pasta from the regular component logic. pub struct ConnectorApplication { + // Communicating about new jobs and setting up sync rounds sync_done: SyncDone, job_queue: JobQueue, + is_in_sync: bool, + // Handling current sync round + sync_desc: Vec, + exec_tree: FakeTree, + consensus: Consensus, + branch_extra: Vec, // instruction counter per branch +} + +impl Connector for ConnectorApplication { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + if self.is_in_sync { + return self.run_in_sync_mode(sched_ctx, comp_ctx); + } else { + return self.run_in_deterministic_mode(sched_ctx, comp_ctx); + } + } } impl ConnectorApplication { @@ -44,17 +71,16 @@ impl ConnectorApplication { let connector = ConnectorApplication { sync_done: sync_done.clone(), - job_queue: job_queue.clone() + job_queue: job_queue.clone(), + is_in_sync: false, + sync_desc: Vec::new(), }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); return (connector, interface); } -} -impl Connector for ConnectorApplication { - fn run(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { - // Handle any incoming messages if we're participating in a round + fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) { while let Some(message) = comp_ctx.read_next_message() { match message { Message::Data(_) => todo!("data message in API connector"), @@ -62,52 +88,230 @@ impl Connector for ConnectorApplication { Message::Control(_) => todo!("impossible control message"), } } + } + + pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + // Go through all branches that are awaiting new messages and see if + // there is one that can receive this message. + debug_assert!(ctx.workspace_branches.is_empty()); + let mut branches = Vec::new(); // TODO: @Remove + if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) { + // Old message, so drop it + return; + } - // Handle requests coming from the API - { - let mut queue = self.job_queue.lock().unwrap(); - while let Some(job) = queue.pop_front() { - match job { - ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { - comp_ctx.push_port(endpoint_a); - comp_ctx.push_port(endpoint_b); + for branch_id in branches.drain(..) { + // This branch can receive, so fork and given it the message + let receiving_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + let receiving_branch = &mut self.tree[receiving_branch_id]; + + receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + + // And prepare the branch for running + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + } + } + + pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { + if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { + self.collapse_sync_to_solution_branch(solution_branch_id, ctx); + } + } + + fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(self.is_in_sync); + + self.handle_new_messages(comp_ctx); + + let branch_id = self.exec_tree.pop_from_queue(QueueKind::Runnable); + if branch_id.is_none() { + return ConnectorScheduling::NotNow; + } + + let branch_id = branch_id.unwrap(); + let branch = &mut self.exec_tree[branch_id]; + let mut instruction_idx = self.branch_extra[branch_id.index as usize]; + + if instruction_idx >= self.sync_desc.len() { + // Performed last instruction, so this branch is officially at the + // end of the synchronous interaction. + let consistency = self.consensus.notify_of_finished_branch(branch_id); + if consistency == Consistency::Valid { + branch.sync_state = SpeculativeState::ReachedSyncEnd; + self.exec_tree.push_into_queue(QueueKind::FinishedSync, branch_id); + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + } else { + // We still have instructions to perform + let cur_instruction = &self.sync_desc[instruction_idx]; + self.branch_extra[branch_id.index as usize] += 1; + + match &cur_instruction { + ApplicationSyncAction::Put(port_id, content) => { + let port_id = *port_id; + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, ctx); + let message = Message::Data(DataMessage { + sync_header, + data_header, + content: DataContent::Message(content.clone()), + }); + comp_ctx.submit_message(message); + self.exec_tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; + } else { + branch.sync_state = SpeculativeState::Inconsistent; } - ApplicationJob::NewConnector(connector, initial_ports) => { - comp_ctx.push_component(connector, initial_ports); - }, - ApplicationJob::Shutdown => { - debug_assert!(queue.is_empty()); - return ConnectorScheduling::Exit; + }, + ApplicationSyncAction::Get(port_id) => { + let port_id = *port_id; + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.exec_tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.exec_tree.fork_branch(branch_id); + let branch = &mut self.exec_tree[receiving_branch_id]; + debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); + self.branch_extra.push(instruction_idx + 1); + + branch.insert_message(port_id, message.content.as_message().unwrap().clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + self.exec_tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; + } + } + + if any_message_received { + return ConnectorScheduling::Immediate; + } + } else { + branch.sync_state = SpeculativeState::Inconsistent; } } } } + if self.exec_tree.queue_is_empty(QueueKind::Runnable) { + return ConnectorScheduling::NotNow; + } else { + return ConnectorScheduling::Later; + } + } + + fn run_in_deterministic_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(!self.is_in_sync); + + // In non-sync mode the application component doesn't really do anything + // except performing jobs submitted from the API. This is the only + // case where we expect to be woken up. + let mut queue = self.job_queue.lock().unwrap(); + while let Some(job) = queue.pop_front() { + match job { + ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { + comp_ctx.push_port(endpoint_a); + comp_ctx.push_port(endpoint_b); + } + ApplicationJob::NewConnector(connector, initial_ports) => { + comp_ctx.push_component(connector, initial_ports); + }, + ApplicationJob::SyncRound(mut description) => { + // Entering sync mode + if description.is_empty() { + // To simplify logic we always have one instruction + description.push(ApplicationSyncAction::Noop); + } + + self.sync_desc = description; + self.is_in_sync = true; + debug_assert!(self.branch_extra.is_empty()); + + let first_branch_id = self.exec_tree.start_sync(); + self.exec_tree.push_into_queue(QueueKind::Runnable, first_branch_id); + self.consensus.start_sync(ctx); + self.branch_extra.push(0); // set first branch to first instruction + + return ConnectorScheduling::Immediate; + }, + ApplicationJob::Shutdown => { + debug_assert!(queue.is_empty()); + return ConnectorScheduling::Exit; + } + } + } + return ConnectorScheduling::NotNow; } } +// ----------------------------------------------------------------------------- +// ApplicationInterface +// ----------------------------------------------------------------------------- + +#[derive(Debug)] +pub enum ChannelCreationError { + InSync, +} + +#[derive(Debug)] +pub enum ApplicationStartSyncError { + AlreadyInSync, + NoSyncActions, + IncorrectPortKind, + UnownedPort, +} + +#[derive(Debug)] +pub enum ApplicationEndSyncError { + NotInSync, +} + +pub enum ApplicationSyncAction { + Put(PortIdLocal, ValueGroup), + Get(PortIdLocal), +} + /// The interface to a `ApplicationConnector`. This allows setting up the /// interactions the `ApplicationConnector` performs within a synchronous round. pub struct ApplicationInterface { sync_done: SyncDone, job_queue: JobQueue, runtime: Arc, + is_in_sync: bool, connector_id: ConnectorId, - owned_ports: Vec, + owned_ports: Vec<(PortKind, PortIdLocal)>, } impl ApplicationInterface { fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { return Self{ sync_done, job_queue, runtime, + is_in_sync: false, connector_id: ConnectorId::new_invalid(), owned_ports: Vec::new(), } } - /// Creates a new channel. - pub fn create_channel(&mut self) -> Channel { + /// Creates a new channel. Can only fail if the application interface is + /// currently in sync mode. + pub fn create_channel(&mut self) -> Result { + if self.is_in_sync { + return Err(ChannelCreationError::InSync); + } + let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id); debug_assert_eq!(getter_port.kind, PortKind::Getter); let getter_id = getter_port.self_id; @@ -120,10 +324,10 @@ impl ApplicationInterface { // Add to owned ports for error checking while creating a connector self.owned_ports.reserve(2); - self.owned_ports.push(putter_id); - self.owned_ports.push(getter_id); + self.owned_ports.push((PortKind::Putter, putter_id)); + self.owned_ports.push((PortKind::Getter, getter_id)); - return Channel{ putter_id, getter_id }; + return Ok(Channel{ putter_id, getter_id }); } /// Creates a new connector. Note that it is not scheduled immediately, but @@ -131,6 +335,10 @@ impl ApplicationInterface { /// connector being scheduled. // TODO: Yank out scheduler logic for common use. pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> { + if self.is_in_sync { + return Err(ComponentCreationError::InSync); + } + // Retrieve ports and make sure that we own the ones that are currently // specified. This is also checked by the scheduler, but that is done // asynchronously. @@ -162,15 +370,58 @@ impl ApplicationInterface { return Ok(()); } - /// Check if the next sync-round is finished. - pub fn try_wait(&self) -> bool { - let (is_done, _) = &*self.sync_done; - let lock = is_done.lock().unwrap(); - return *lock; + /// Queues up a description of a synchronous round to run. Will not actually + /// run the synchronous behaviour in blocking fashion. The results *must* be + /// retrieved using `try_wait` or `wait` for the interface to be considered + /// in non-sync mode. + pub fn perform_sync_round(&mut self, actions: Vec) -> Result<(), ApplicationStartSyncError> { + if self.is_in_sync { + return Err(ApplicationStartSyncError::AlreadyInSync); + } + + // Check the action ports for consistency + for action in &actions { + let (port_id, expected_kind) = match action { + ApplicationSyncAction::Put(port_id, _) => (*port_id, PortKind::Putter), + ApplicationSyncAction::Get(port_id) => (*port_id, PortKind::Getter), + }; + + match self.find_port_by_id(port_id) { + Some(port_kind) => { + if port_kind != expected_kind { + return Err(ApplicationStartSyncError::IncorrectPortKind) + } + }, + None => { + return Err(ApplicationStartSyncError::UnownedPort); + } + } + } + + // Everything is consistent, go into sync mode and send the actions off + // to the component that will actually perform the sync round + self.is_in_sync = true; + { + let (is_done, _) = &*self.sync_done; + let mut lock = is_done.lock().unwrap(); + *lock = false; + } + + { + let mut lock = self.job_queue.lock().unwrap(); + lock.push_back(ApplicationJob::SyncRound(actions)); + } + + self.wake_up_connector_with_ping(); + return Ok(()) } /// Wait until the next sync-round is finished - pub fn wait(&self) { + pub fn wait(&self) -> Result, ApplicationEndSyncError> { + if !self.is_in_sync { + return Err(ApplicationEndSyncError::NotInSync); + } + let (is_done, condition) = &*self.sync_done; let lock = is_done.lock().unwrap(); condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done @@ -198,6 +449,12 @@ impl ApplicationInterface { self.runtime.push_work(key); } } + + fn find_port_by_id(&self, port_id: PortIdLocal) -> Option { + return self.owned_ports.iter() + .find(|(_, owned_id)| *owned_id == port_id) + .map(|(port_kind, _)| *port_kind); + } } impl Drop for ApplicationInterface {