use std::collections::VecDeque; use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::Ordering; use std::collections::HashMap; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; use crate::runtime2::consensus::RoundConclusion; use super::{ConnectorKey, ConnectorId, RuntimeInner}; use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; use super::scheduler::{SchedulerCtx, ComponentCtx}; use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::connector::{ConnectorScheduling, ConnectorPDL}; use super::inbox::{ Message, DataContent, DataMessage, SyncCompMessage, SyncPortMessage, ControlContent, ControlMessage }; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { /// Should run the connector's behaviour up until the next blocking point. /// One should generally request and handle new messages from the component /// context. Then perform any logic the component has to do, and in the /// process perhaps queue up some state changes using the same context. fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling; } pub(crate) struct FinishedSync { // In the order of the `get` calls success: bool, inbox: Vec, } type SyncDone = Arc<(Mutex>, Condvar)>; type JobQueue = Arc>>; enum ApplicationJob { NewChannel((Port, Port)), NewConnector(ConnectorPDL, Vec), SyncRound(Vec), Shutdown, } // ----------------------------------------------------------------------------- // ConnectorApplication // ----------------------------------------------------------------------------- /// The connector which an application can directly interface with. Once may set /// up the next synchronous round, and retrieve the data afterwards. // TODO: Strong candidate for logic reduction in handling put/get. A lot of code // is an approximate copy-pasta from the regular component logic. I'm going to // wait until I'm implementing more native components to see which logic is // truly common. pub struct ConnectorApplication { // Communicating about new jobs and setting up sync rounds sync_done: SyncDone, job_queue: JobQueue, is_in_sync: bool, // Handling current sync round sync_desc: Vec, tree: FakeTree, consensus: Consensus, last_finished_handled: Option, branch_extra: Vec, // instruction counter per branch } impl Connector for ConnectorApplication { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { if self.is_in_sync { let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); while let Some(branch_id) = iter_id { iter_id = self.tree.get_queue_next(branch_id); self.last_finished_handled = Some(branch_id); if let Some(conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { // Can finish sync round immediately self.collapse_sync_to_conclusion(conclusion, comp_ctx); return ConnectorScheduling::Immediate; } } return scheduling; } else { return self.run_in_deterministic_mode(sched_ctx, comp_ctx); } } } impl ConnectorApplication { pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { let sync_done = Arc::new(( Mutex::new(None), Condvar::new() )); let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); let connector = ConnectorApplication { sync_done: sync_done.clone(), job_queue: job_queue.clone(), is_in_sync: false, sync_desc: Vec::new(), tree: FakeTree::new(), consensus: Consensus::new(), last_finished_handled: None, branch_extra: vec![0], }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); return (connector, interface); } fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) { while let Some(message) = comp_ctx.read_next_message() { match message { Message::Data(message) => self.handle_new_data_message(message, comp_ctx), Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx), Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx), Message::Control(_) => unreachable!("control message in native API component"), } } } pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. if !self.consensus.handle_new_data_message(&message, ctx) { // Old message, so drop it return; } let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); while let Some(branch_id) = iter_id { iter_id = self.tree.get_queue_next(branch_id); let branch = &self.tree[branch_id]; if branch.awaiting_port != message.data_header.target_port { continue; } if !self.consensus.branch_can_receive(branch_id, &message) { continue; } // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } pub(crate) fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) { if let Some(conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) { self.collapse_sync_to_conclusion(conclusion, ctx); } } pub(crate) fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { self.consensus.handle_new_sync_port_message(message, ctx); } fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { debug_assert!(self.is_in_sync); self.handle_new_messages(comp_ctx); let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); if branch_id.is_none() { return ConnectorScheduling::NotNow; } let branch_id = branch_id.unwrap(); let branch = &mut self.tree[branch_id]; let mut instruction_idx = self.branch_extra[branch_id.index as usize]; if instruction_idx >= self.sync_desc.len() { // Performed last instruction, so this branch is officially at the // end of the synchronous interaction. let consistency = self.consensus.notify_of_finished_branch(branch_id); if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); } else { branch.sync_state = SpeculativeState::Inconsistent; } } else { // We still have instructions to perform let cur_instruction = &self.sync_desc[instruction_idx]; self.branch_extra[branch_id.index as usize] += 1; match &cur_instruction { ApplicationSyncAction::Put(port_id, content) => { let port_id = *port_id; let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); let message = Message::Data(DataMessage { sync_header, data_header, content: DataContent::Message(content.clone()), }); comp_ctx.submit_message(message); self.tree.push_into_queue(QueueKind::Runnable, branch_id); return ConnectorScheduling::Immediate; }, ApplicationSyncAction::Get(port_id) => { let port_id = *port_id; branch.sync_state = SpeculativeState::HaltedAtBranchPoint; branch.awaiting_port = port_id; self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); let mut any_message_received = false; for message in comp_ctx.get_read_data_messages(port_id) { if self.consensus.branch_can_receive(branch_id, &message) { // This branch can receive the message, so we do the // fork-and-receive dance let receiving_branch_id = self.tree.fork_branch(branch_id); let branch = &mut self.tree[receiving_branch_id]; debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); self.branch_extra.push(instruction_idx + 1); branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; } } if any_message_received { return ConnectorScheduling::Immediate; } } } } if self.tree.queue_is_empty(QueueKind::Runnable) { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; } } fn run_in_deterministic_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { debug_assert!(!self.is_in_sync); // In non-sync mode the application component doesn't really do anything // except performing jobs submitted from the API. This is the only // case where we expect to be woken up. // Note that we have to communicate to the scheduler when we've received // ports or created components (hence: given away ports) *before* we // enter a sync round. let mut queue = self.job_queue.lock().unwrap(); while let Some(job) = queue.pop_front() { match job { ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { comp_ctx.push_port(endpoint_a); comp_ctx.push_port(endpoint_b); return ConnectorScheduling::Immediate; } ApplicationJob::NewConnector(connector, initial_ports) => { comp_ctx.push_component(connector, initial_ports); return ConnectorScheduling::Later; }, ApplicationJob::SyncRound(mut description) => { // Entering sync mode comp_ctx.notify_sync_start(); self.sync_desc = description; self.is_in_sync = true; debug_assert!(self.last_finished_handled.is_none()); debug_assert!(self.branch_extra.len() == 1); let first_branch_id = self.tree.start_sync(); self.tree.push_into_queue(QueueKind::Runnable, first_branch_id); debug_assert!(first_branch_id.index == 1); self.consensus.start_sync(comp_ctx); self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id); self.branch_extra.push(0); // set first branch to first instruction return ConnectorScheduling::Immediate; }, ApplicationJob::Shutdown => { debug_assert!(queue.is_empty()); return ConnectorScheduling::Exit; } } } // Queue was empty return ConnectorScheduling::NotNow; } fn collapse_sync_to_conclusion(&mut self, conclusion: RoundConclusion, comp_ctx: &mut ComponentCtx) { // Notifying tree, consensus algorithm and context of ending sync let mut fake_vec = Vec::new(); let (branch_id, success) = match conclusion { RoundConclusion::Success(branch_id) => { debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program provided by API (branch_id, true) }, RoundConclusion::Failure => (BranchId::new_invalid(), false), }; let mut solution_branch = self.tree.end_sync(branch_id); self.consensus.end_sync(branch_id, &mut fake_vec); debug_assert!(fake_vec.is_empty()); comp_ctx.notify_sync_end(&[]); // Turning hashmapped inbox into vector of values let mut inbox = Vec::with_capacity(solution_branch.inbox.len()); for action in &self.sync_desc { match action { ApplicationSyncAction::Put(_, _) => {}, ApplicationSyncAction::Get(port_id) => { debug_assert!(solution_branch.inbox.contains_key(port_id)); inbox.push(solution_branch.inbox.remove(port_id).unwrap()); }, } } // Notifying interface of ending sync self.is_in_sync = false; self.sync_desc.clear(); self.branch_extra.truncate(1); self.last_finished_handled = None; let (results, notification) = &*self.sync_done; let mut results = results.lock().unwrap(); *results = Some(FinishedSync{ success, inbox }); notification.notify_one(); } } // ----------------------------------------------------------------------------- // ApplicationInterface // ----------------------------------------------------------------------------- #[derive(Debug)] pub enum ChannelCreationError { InSync, } #[derive(Debug)] pub enum ApplicationStartSyncError { AlreadyInSync, NoSyncActions, IncorrectPortKind, UnownedPort, } #[derive(Debug)] pub enum ApplicationEndSyncError { NotInSync, Failure, } pub enum ApplicationSyncAction { Put(PortIdLocal, ValueGroup), Get(PortIdLocal), } /// The interface to a `ApplicationConnector`. This allows setting up the /// interactions the `ApplicationConnector` performs within a synchronous round. pub struct ApplicationInterface { sync_done: SyncDone, job_queue: JobQueue, runtime: Arc, is_in_sync: bool, connector_id: ConnectorId, owned_ports: Vec<(PortKind, PortIdLocal)>, } impl ApplicationInterface { fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { return Self{ sync_done, job_queue, runtime, is_in_sync: false, connector_id: ConnectorId::new_invalid(), owned_ports: Vec::new(), } } /// Creates a new channel. Can only fail if the application interface is /// currently in sync mode. pub fn create_channel(&mut self) -> Result { if self.is_in_sync { return Err(ChannelCreationError::InSync); } let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id); debug_assert_eq!(getter_port.kind, PortKind::Getter); let getter_id = getter_port.self_id; let putter_id = putter_port.self_id; { let mut lock = self.job_queue.lock().unwrap(); lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port))); } // Add to owned ports for error checking while creating a connector self.owned_ports.reserve(2); self.owned_ports.push((PortKind::Putter, putter_id)); self.owned_ports.push((PortKind::Getter, getter_id)); return Ok(Channel{ putter_id, getter_id }); } /// Creates a new connector. Note that it is not scheduled immediately, but /// depends on the `ApplicationConnector` to run, followed by the created /// connector being scheduled. pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> { if self.is_in_sync { return Err(ComponentCreationError::InSync); } // Retrieve ports and make sure that we own the ones that are currently // specified. This is also checked by the scheduler, but that is done // asynchronously. let mut initial_ports = Vec::new(); find_ports_in_value_group(&arguments, &mut initial_ports); for initial_port in &initial_ports { if !self.owned_ports.iter().any(|(_, v)| v == initial_port) { return Err(ComponentCreationError::UnownedPort); } } // We own all ports, so remove them on this side for initial_port in &initial_ports { let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap(); self.owned_ports.remove(position); } let prompt = self.runtime.protocol_description.new_component_v2(module.as_bytes(), routine.as_bytes(), arguments)?; let connector = ConnectorPDL::new(prompt); // Put on job queue { let mut queue = self.job_queue.lock().unwrap(); queue.push_back(ApplicationJob::NewConnector(connector, initial_ports)); } self.wake_up_connector_with_ping(); return Ok(()); } /// Queues up a description of a synchronous round to run. Will not actually /// run the synchronous behaviour in blocking fashion. The results *must* be /// retrieved using `try_wait` or `wait` for the interface to be considered /// in non-sync mode. // TODO: Maybe change API in the future. For now it does the job pub fn perform_sync_round(&mut self, actions: Vec) -> Result<(), ApplicationStartSyncError> { if self.is_in_sync { return Err(ApplicationStartSyncError::AlreadyInSync); } // Check the action ports for consistency for action in &actions { let (port_id, expected_kind) = match action { ApplicationSyncAction::Put(port_id, _) => (*port_id, PortKind::Putter), ApplicationSyncAction::Get(port_id) => (*port_id, PortKind::Getter), }; match self.find_port_by_id(port_id) { Some(port_kind) => { if port_kind != expected_kind { return Err(ApplicationStartSyncError::IncorrectPortKind) } }, None => { return Err(ApplicationStartSyncError::UnownedPort); } } } // Everything is consistent, go into sync mode and send the actions off // to the component that will actually perform the sync round self.is_in_sync = true; { let (is_done, _) = &*self.sync_done; let mut lock = is_done.lock().unwrap(); *lock = None; } { let mut lock = self.job_queue.lock().unwrap(); lock.push_back(ApplicationJob::SyncRound(actions)); } self.wake_up_connector_with_ping(); return Ok(()) } /// Wait until the next sync-round is finished, returning the received /// messages in order of `get` calls. pub fn wait(&mut self) -> Result, ApplicationEndSyncError> { if !self.is_in_sync { return Err(ApplicationEndSyncError::NotInSync); } let (is_done, condition) = &*self.sync_done; let mut lock = is_done.lock().unwrap(); lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done self.is_in_sync = false; let result = lock.take().unwrap(); if result.success { return Ok(result.inbox); } else { return Err(ApplicationEndSyncError::Failure); } } /// Called by runtime to set associated connector's ID. pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { self.connector_id = id; } fn wake_up_connector_with_ping(&self) { let connector = self.runtime.get_component_public(self.connector_id); connector.inbox.insert_message(Message::Control(ControlMessage { id: 0, sending_component_id: self.connector_id, content: ControlContent::Ping, })); let should_wake_up = connector.sleeping .compare_exchange(true, false, Ordering::SeqCst, Ordering::Acquire) .is_ok(); if should_wake_up { let key = unsafe{ ConnectorKey::from_id(self.connector_id) }; self.runtime.push_work(key); } } fn find_port_by_id(&self, port_id: PortIdLocal) -> Option { return self.owned_ports.iter() .find(|(_, owned_id)| *owned_id == port_id) .map(|(port_kind, _)| *port_kind); } } impl Drop for ApplicationInterface { fn drop(&mut self) { { let mut lock = self.job_queue.lock().unwrap(); lock.push_back(ApplicationJob::Shutdown); } self.wake_up_connector_with_ping(); self.runtime.decrement_active_interfaces(); } }