diff --git a/src/runtime/native.rs b/src/runtime/native.rs new file mode 100644 index 0000000000000000000000000000000000000000..b92c59536f1423adb045fa1f53099ed75095c7b7 --- /dev/null +++ b/src/runtime/native.rs @@ -0,0 +1,559 @@ +use std::collections::VecDeque; +use std::sync::{Arc, Mutex, Condvar}; + +use crate::protocol::ComponentCreationError; +use crate::protocol::eval::ValueGroup; +use crate::runtime::consensus::RoundConclusion; + +use super::{ConnectorId, RuntimeInner}; +use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; +use super::scheduler::{SchedulerCtx, ComponentCtx, MessageTicket}; +use super::port::{Port, PortIdLocal, Channel, PortKind}; +use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; +use super::connector::{ConnectorScheduling, ConnectorPDL}; +use super::inbox::{ + Message, DataMessage, + SyncCompMessage, SyncPortMessage, + ControlContent, ControlMessage +}; + +/// Generic connector interface from the scheduler's point of view. +pub(crate) trait Connector { + /// Should run the connector's behaviour up until the next blocking point. + /// One should generally request and handle new messages from the component + /// context. Then perform any logic the component has to do, and in the + /// process perhaps queue up some state changes using the same context. + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling; +} + +pub(crate) struct FinishedSync { + // In the order of the `get` calls + success: bool, + inbox: Vec, +} + +type SyncDone = Arc<(Mutex>, Condvar)>; +type JobQueue = Arc>>; + +enum ApplicationJob { + NewChannel((Port, Port)), + NewConnector(ConnectorPDL, Vec), + SyncRound(Vec), + Shutdown, +} + +// ----------------------------------------------------------------------------- +// ConnectorApplication +// ----------------------------------------------------------------------------- + +/// The connector which an application can directly interface with. Once may set +/// up the next synchronous round, and retrieve the data afterwards. +// TODO: Strong candidate for logic reduction in handling put/get. A lot of code +// is an approximate copy-pasta from the regular component logic. I'm going to +// wait until I'm implementing more native components to see which logic is +// truly common. +pub struct ConnectorApplication { + // Communicating about new jobs and setting up sync rounds + sync_done: SyncDone, + job_queue: JobQueue, + is_in_sync: bool, + // Handling current sync round + sync_desc: Vec, + tree: FakeTree, + consensus: Consensus, + last_finished_handled: Option, + branch_extra: Vec, // instruction counter per branch +} + +impl Connector for ConnectorApplication { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + if self.is_in_sync { + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + if let Some(conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Can finish sync round immediately + self.collapse_sync_to_conclusion(conclusion, comp_ctx); + return ConnectorScheduling::Immediate; + } + } + + return scheduling; + } else { + return self.run_in_deterministic_mode(sched_ctx, comp_ctx); + } + } +} + +impl ConnectorApplication { + pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { + let sync_done = Arc::new(( Mutex::new(None), Condvar::new() )); + let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); + + let connector = ConnectorApplication { + sync_done: sync_done.clone(), + job_queue: job_queue.clone(), + is_in_sync: false, + sync_desc: Vec::new(), + tree: FakeTree::new(), + consensus: Consensus::new(), + last_finished_handled: None, + branch_extra: vec![0], + }; + let interface = ApplicationInterface::new(sync_done, job_queue, runtime); + + return (connector, interface); + } + + fn handle_new_messages(&mut self, comp_ctx: &mut ComponentCtx) { + while let Some(ticket) = comp_ctx.get_next_message_ticket() { + let message = comp_ctx.read_message_using_ticket(ticket); + if let Message::Data(_) = message { + self.handle_new_data_message(ticket, comp_ctx) + } else { + match comp_ctx.take_message_using_ticket(ticket) { + Message::Data(message) => unreachable!(), + Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx), + Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx), + Message::SyncControl(message) => todo!("implement"), + Message::Control(_) => unreachable!("control message in native API component"), + } + } + } + } + + pub(crate) fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) { + // Go through all branches that are awaiting new messages and see if + // there is one that can receive this message. + if !self.consensus.handle_new_data_message(ticket, ctx) { + // Old message, so drop it + return; + } + + let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); + while let Some(branch_id) = iter_id { + let message = ctx.read_message_using_ticket(ticket).as_data(); + iter_id = self.tree.get_queue_next(branch_id); + + let branch = &self.tree[branch_id]; + if branch.awaiting_port != message.data_header.target_port { continue; } + if !self.consensus.branch_can_receive(branch_id, &message) { continue; } + + // This branch can receive, so fork and given it the message + let receiving_branch_id = self.tree.fork_branch(branch_id); + debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); + self.branch_extra.push(self.branch_extra[branch_id.index as usize]); // copy instruction index + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + let receiving_branch = &mut self.tree[receiving_branch_id]; + + receiving_branch.insert_message(message.data_header.target_port, message.content.clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); + + // And prepare the branch for running + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + } + } + + pub(crate) fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) { + if let Some(conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) { + self.collapse_sync_to_conclusion(conclusion, ctx); + } + } + + pub(crate) fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { + self.consensus.handle_new_sync_port_message(message, ctx); + } + + fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(self.is_in_sync); + + self.handle_new_messages(comp_ctx); + + let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); + if branch_id.is_none() { + return ConnectorScheduling::NotNow; + } + + let branch_id = branch_id.unwrap(); + let branch = &mut self.tree[branch_id]; + let mut instruction_idx = self.branch_extra[branch_id.index as usize]; + + if instruction_idx >= self.sync_desc.len() { + // Performed last instruction, so this branch is officially at the + // end of the synchronous interaction. + let consistency = self.consensus.notify_of_finished_branch(branch_id); + if consistency == Consistency::Valid { + branch.sync_state = SpeculativeState::ReachedSyncEnd; + self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + } else { + // We still have instructions to perform + let cur_instruction = &self.sync_desc[instruction_idx]; + self.branch_extra[branch_id.index as usize] += 1; + + match &cur_instruction { + ApplicationSyncAction::Put(port_id, content) => { + let port_id = *port_id; + + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + let message = Message::Data(DataMessage { + sync_header, + data_header, + content: content.clone(), + }); + comp_ctx.submit_message(message); + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; + }, + ApplicationSyncAction::Get(port_id) => { + let port_id = *port_id; + + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); + self.branch_extra.push(instruction_idx + 1); + + branch.insert_message(port_id, message.content.clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; + } + } + + if any_message_received { + return ConnectorScheduling::Immediate; + } + } + } + } + + if self.tree.queue_is_empty(QueueKind::Runnable) { + return ConnectorScheduling::NotNow; + } else { + return ConnectorScheduling::Later; + } + } + + fn run_in_deterministic_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(!self.is_in_sync); + + // In non-sync mode the application component doesn't really do anything + // except performing jobs submitted from the API. This is the only + // case where we expect to be woken up. + // Note that we have to communicate to the scheduler when we've received + // ports or created components (hence: given away ports) *before* we + // enter a sync round. + let mut queue = self.job_queue.lock().unwrap(); + while let Some(job) = queue.pop_front() { + match job { + ApplicationJob::NewChannel((endpoint_a, endpoint_b)) => { + comp_ctx.push_port(endpoint_a); + comp_ctx.push_port(endpoint_b); + + return ConnectorScheduling::Immediate; + } + ApplicationJob::NewConnector(connector, initial_ports) => { + comp_ctx.push_component(connector, initial_ports); + + return ConnectorScheduling::Later; + }, + ApplicationJob::SyncRound(mut description) => { + // Entering sync mode + comp_ctx.notify_sync_start(); + self.sync_desc = description; + self.is_in_sync = true; + debug_assert!(self.last_finished_handled.is_none()); + debug_assert!(self.branch_extra.len() == 1); + + let first_branch_id = self.tree.start_sync(); + self.tree.push_into_queue(QueueKind::Runnable, first_branch_id); + debug_assert!(first_branch_id.index == 1); + self.consensus.start_sync(comp_ctx); + self.consensus.notify_of_new_branch(BranchId::new_invalid(), first_branch_id); + self.branch_extra.push(0); // set first branch to first instruction + + return ConnectorScheduling::Immediate; + }, + ApplicationJob::Shutdown => { + debug_assert!(queue.is_empty()); + + return ConnectorScheduling::Exit; + } + } + } + + // Queue was empty + return ConnectorScheduling::NotNow; + } + + fn collapse_sync_to_conclusion(&mut self, conclusion: RoundConclusion, comp_ctx: &mut ComponentCtx) { + // Notifying tree, consensus algorithm and context of ending sync + let mut fake_vec = Vec::new(); + + let (branch_id, success) = match conclusion { + RoundConclusion::Success(branch_id) => { + debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program provided by API + (branch_id, true) + }, + RoundConclusion::Failure => (BranchId::new_invalid(), false), + }; + + let mut solution_branch = self.tree.end_sync(branch_id); + self.consensus.end_sync(branch_id, &mut fake_vec); + debug_assert!(fake_vec.is_empty()); + + comp_ctx.notify_sync_end(&[]); + + // Turning hashmapped inbox into vector of values + let mut inbox = Vec::with_capacity(solution_branch.inbox.len()); + for action in &self.sync_desc { + match action { + ApplicationSyncAction::Put(_, _) => {}, + ApplicationSyncAction::Get(port_id) => { + debug_assert!(solution_branch.inbox.contains_key(port_id)); + inbox.push(solution_branch.inbox.remove(port_id).unwrap()); + }, + } + } + + // Notifying interface of ending sync + self.is_in_sync = false; + self.sync_desc.clear(); + self.branch_extra.truncate(1); + self.last_finished_handled = None; + + let (results, notification) = &*self.sync_done; + let mut results = results.lock().unwrap(); + *results = Some(FinishedSync{ success, inbox }); + notification.notify_one(); + } +} + +// ----------------------------------------------------------------------------- +// ApplicationInterface +// ----------------------------------------------------------------------------- + +#[derive(Debug)] +pub enum ChannelCreationError { + InSync, +} + +#[derive(Debug)] +pub enum ApplicationStartSyncError { + AlreadyInSync, + NoSyncActions, + IncorrectPortKind, + UnownedPort, +} + +#[derive(Debug)] +pub enum ApplicationEndSyncError { + NotInSync, + Failure, +} + +pub enum ApplicationSyncAction { + Put(PortIdLocal, ValueGroup), + Get(PortIdLocal), +} + +/// The interface to a `ApplicationConnector`. This allows setting up the +/// interactions the `ApplicationConnector` performs within a synchronous round. +pub struct ApplicationInterface { + sync_done: SyncDone, + job_queue: JobQueue, + runtime: Arc, + is_in_sync: bool, + connector_id: ConnectorId, + owned_ports: Vec<(PortKind, PortIdLocal)>, +} + +impl ApplicationInterface { + fn new(sync_done: SyncDone, job_queue: JobQueue, runtime: Arc) -> Self { + return Self{ + sync_done, job_queue, runtime, + is_in_sync: false, + connector_id: ConnectorId::new_invalid(), + owned_ports: Vec::new(), + } + } + + /// Creates a new channel. Can only fail if the application interface is + /// currently in sync mode. + pub fn create_channel(&mut self) -> Result { + if self.is_in_sync { + return Err(ChannelCreationError::InSync); + } + + let (getter_port, putter_port) = self.runtime.create_channel(self.connector_id); + debug_assert_eq!(getter_port.kind, PortKind::Getter); + let getter_id = getter_port.self_id; + let putter_id = putter_port.self_id; + + { + let mut lock = self.job_queue.lock().unwrap(); + lock.push_back(ApplicationJob::NewChannel((getter_port, putter_port))); + } + + // Add to owned ports for error checking while creating a connector + self.owned_ports.reserve(2); + self.owned_ports.push((PortKind::Putter, putter_id)); + self.owned_ports.push((PortKind::Getter, getter_id)); + + return Ok(Channel{ putter_id, getter_id }); + } + + /// Creates a new connector. Note that it is not scheduled immediately, but + /// depends on the `ApplicationConnector` to run, followed by the created + /// connector being scheduled. + pub fn create_connector(&mut self, module: &str, routine: &str, arguments: ValueGroup) -> Result<(), ComponentCreationError> { + if self.is_in_sync { + return Err(ComponentCreationError::InSync); + } + + // Retrieve ports and make sure that we own the ones that are currently + // specified. This is also checked by the scheduler, but that is done + // asynchronously. + let mut initial_ports = Vec::new(); + find_ports_in_value_group(&arguments, &mut initial_ports); + for initial_port in &initial_ports { + if !self.owned_ports.iter().any(|(_, v)| v == initial_port) { + return Err(ComponentCreationError::UnownedPort); + } + } + + // We own all ports, so remove them on this side + for initial_port in &initial_ports { + let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap(); + self.owned_ports.remove(position); + } + + let prompt = self.runtime.protocol_description.new_component(module.as_bytes(), routine.as_bytes(), arguments)?; + let connector = ConnectorPDL::new(prompt); + + // Put on job queue + { + let mut queue = self.job_queue.lock().unwrap(); + queue.push_back(ApplicationJob::NewConnector(connector, initial_ports)); + } + + self.wake_up_connector_with_ping(); + + return Ok(()); + } + + /// Queues up a description of a synchronous round to run. Will not actually + /// run the synchronous behaviour in blocking fashion. The results *must* be + /// retrieved using `try_wait` or `wait` for the interface to be considered + /// in non-sync mode. + pub fn perform_sync_round(&mut self, actions: Vec) -> Result<(), ApplicationStartSyncError> { + if self.is_in_sync { + return Err(ApplicationStartSyncError::AlreadyInSync); + } + + // Check the action ports for consistency + for action in &actions { + let (port_id, expected_kind) = match action { + ApplicationSyncAction::Put(port_id, _) => (*port_id, PortKind::Putter), + ApplicationSyncAction::Get(port_id) => (*port_id, PortKind::Getter), + }; + + match self.find_port_by_id(port_id) { + Some(port_kind) => { + if port_kind != expected_kind { + return Err(ApplicationStartSyncError::IncorrectPortKind) + } + }, + None => { + return Err(ApplicationStartSyncError::UnownedPort); + } + } + } + + // Everything is consistent, go into sync mode and send the actions off + // to the component that will actually perform the sync round + self.is_in_sync = true; + { + let (is_done, _) = &*self.sync_done; + let mut lock = is_done.lock().unwrap(); + *lock = None; + } + + { + let mut lock = self.job_queue.lock().unwrap(); + lock.push_back(ApplicationJob::SyncRound(actions)); + } + + self.wake_up_connector_with_ping(); + return Ok(()) + } + + /// Wait until the next sync-round is finished, returning the received + /// messages in order of `get` calls. + pub fn wait(&mut self) -> Result, ApplicationEndSyncError> { + if !self.is_in_sync { + return Err(ApplicationEndSyncError::NotInSync); + } + + let (is_done, condition) = &*self.sync_done; + let mut lock = is_done.lock().unwrap(); + lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done + + self.is_in_sync = false; + let result = lock.take().unwrap(); + if result.success { + return Ok(result.inbox); + } else { + return Err(ApplicationEndSyncError::Failure); + } + } + + /// Called by runtime to set associated connector's ID. + pub(crate) fn set_connector_id(&mut self, id: ConnectorId) { + self.connector_id = id; + } + + fn wake_up_connector_with_ping(&self) { + let message = ControlMessage { + id: 0, + sending_component_id: self.connector_id, + content: ControlContent::Ping, + }; + self.runtime.send_message_maybe_destroyed(self.connector_id, Message::Control(message)); + } + + fn find_port_by_id(&self, port_id: PortIdLocal) -> Option { + return self.owned_ports.iter() + .find(|(_, owned_id)| *owned_id == port_id) + .map(|(port_kind, _)| *port_kind); + } +} + +impl Drop for ApplicationInterface { + fn drop(&mut self) { + { + let mut lock = self.job_queue.lock().unwrap(); + lock.push_back(ApplicationJob::Shutdown); + } + + self.wake_up_connector_with_ping(); + self.runtime.decrement_active_interfaces(); + } +} \ No newline at end of file