diff --git a/src/runtime/connector.rs b/src/runtime/connector.rs new file mode 100644 index 0000000000000000000000000000000000000000..0b95f378e22907f6cf3479bc9bd7a60176ac6d48 --- /dev/null +++ b/src/runtime/connector.rs @@ -0,0 +1,554 @@ +// connector.rs +// +// Represents a component. A component (and the scheduler that is running it) +// has many properties that are not easy to subdivide into aspects that are +// conceptually handled by particular data structures. That is to say: the code +// that we run governs: running PDL code, keeping track of ports, instantiating +// new components and transports (i.e. interacting with the runtime), running +// a consensus algorithm, etc. But on the other hand, our data is rather +// simple: we have a speculative execution tree, a set of ports that we own, +// and a bit of code that we should run. +// +// So currently the code is organized as following: +// - The scheduler that is running the component is the authoritative source on +// ports during *non-sync* mode. The consensus algorithm is the +// authoritative source during *sync* mode. They retrieve each other's +// state during the transitions. Hence port data exists duplicated between +// these two datastructures. +// - The execution tree is where executed branches reside. But the execution +// tree is only aware of the tree shape itself (and keeps track of some +// queues of branches that are in a particular state), and tends to store +// the PDL program state. The consensus algorithm is also somewhat aware +// of the execution tree, but only in terms of what is needed to complete +// a sync round (for now, that means the port mapping in each branch). +// Hence once more we have properties conceptually associated with branches +// in two places. +// - TODO: Write about handling messages, consensus wrapping data +// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx + +use std::sync::atomic::AtomicBool; + +use crate::ProtocolDescription; +use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, PortId, ValueGroup}; +use crate::protocol::RunContext; + +use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement}; +use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group}; +use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox}; +use super::native::Connector; +use super::port::{PortKind, PortIdLocal}; +use super::scheduler::{ComponentCtx, SchedulerCtx, MessageTicket}; + +pub(crate) struct ConnectorPublic { + pub inbox: PublicInbox, + pub sleeping: AtomicBool, +} + +impl ConnectorPublic { + pub fn new(initialize_as_sleeping: bool) -> Self { + ConnectorPublic{ + inbox: PublicInbox::new(), + sleeping: AtomicBool::new(initialize_as_sleeping), + } + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +enum Mode { + NonSync, // running non-sync code + Sync, // running sync code (in potentially multiple branches) + SyncError, // encountered an unrecoverable error in sync mode + Error, // encountered an error in non-sync mode (or finished handling the sync mode error). +} + +#[derive(Debug)] +pub(crate) enum ConnectorScheduling { + Immediate, // Run again, immediately + Later, // Schedule for running, at some later point in time + NotNow, // Do not reschedule for running + Exit, // Connector has exited +} + +pub(crate) struct ConnectorPDL { + mode: Mode, + eval_error: Option, + tree: ExecTree, + consensus: Consensus, + last_finished_handled: Option, +} + +struct ConnectorRunContext<'a> { + branch_id: BranchId, + consensus: &'a Consensus, + prepared: PreparedStatement, +} + +impl<'a> RunContext for ConnectorRunContext<'a>{ + fn performed_put(&mut self, _port: PortId) -> bool { + return match self.prepared.take() { + PreparedStatement::None => false, + PreparedStatement::PerformedPut => true, + taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken) + }; + } + + fn performed_get(&mut self, _port: PortId) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::PerformedGet(value) => Some(value), + taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken), + }; + } + + fn fires(&mut self, port: PortId) -> Option { + todo!("Remove fires() now"); + let port_id = PortIdLocal::new(port.id); + let annotation = self.consensus.get_annotation(self.branch_id, port_id); + return annotation.expected_firing.map(|v| Value::Bool(v)); + } + + fn created_channel(&mut self) -> Option<(Value, Value)> { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::CreatedChannel(ports) => Some(ports), + taken => unreachable!("prepared statement is '{:?}' during 'created_channel()'", taken), + }; + } + + fn performed_fork(&mut self) -> Option { + return match self.prepared.take() { + PreparedStatement::None => None, + PreparedStatement::ForkedExecution(path) => Some(path), + taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken), + }; + } +} + +impl Connector for ConnectorPDL { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + if let Some(scheduling) = self.handle_new_messages(comp_ctx) { + return scheduling; + } + + match self.mode { + Mode::Sync => { + // Run in sync mode + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + + // Handle any new finished branches + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + if let Some(round_conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Actually found a solution + return self.enter_non_sync_mode(round_conclusion, comp_ctx); + } + + self.last_finished_handled = Some(branch_id); + } + + return scheduling; + }, + Mode::NonSync => { + let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); + return scheduling; + }, + Mode::SyncError => { + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + return scheduling; + }, + Mode::Error => { + // This shouldn't really be called. Because when we reach exit + // mode the scheduler should not run the component anymore + unreachable!("called component run() during error-mode"); + }, + } + } +} + +impl ConnectorPDL { + pub fn new(initial: Prompt) -> Self { + Self{ + mode: Mode::NonSync, + eval_error: None, + tree: ExecTree::new(initial), + consensus: Consensus::new(), + last_finished_handled: None, + } + } + + // --- Handling messages + + pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option { + while let Some(ticket) = ctx.get_next_message_ticket() { + let message = ctx.read_message_using_ticket(ticket); + let immediate_result = if let Message::Data(_) = message { + self.handle_new_data_message(ticket, ctx); + None + } else { + match ctx.take_message_using_ticket(ticket) { + Message::Data(_) => unreachable!(), + Message::SyncComp(message) => { + self.handle_new_sync_comp_message(message, ctx) + }, + Message::SyncPort(message) => { + self.handle_new_sync_port_message(message, ctx); + None + }, + Message::SyncControl(message) => { + self.handle_new_sync_control_message(message, ctx) + }, + Message::Control(_) => unreachable!("control message in component"), + } + }; + + if let Some(result) = immediate_result { + return Some(result); + } + } + + return None; + } + + pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) { + // Go through all branches that are awaiting new messages and see if + // there is one that can receive this message. + if !self.consensus.handle_new_data_message(ticket, ctx) { + // Message should not be handled now + return; + } + + let message = ctx.read_message_using_ticket(ticket).as_data(); + let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + + let branch = &self.tree[branch_id]; + if branch.awaiting_port != message.data_header.target_port { continue; } + if !self.consensus.branch_can_receive(branch_id, &message) { continue; } + + // This branch can receive, so fork and given it the message + let receiving_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + let receiving_branch = &mut self.tree[receiving_branch_id]; + + debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port); + receiving_branch.awaiting_port = PortIdLocal::new_invalid(); + receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); + + // And prepare the branch for running + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + } + } + + pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { + println!("DEBUG: Actually really handling {:?}", message); + if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) { + return Some(self.enter_non_sync_mode(round_conclusion, ctx)); + } + + return None; + } + + pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { + self.consensus.handle_new_sync_port_message(message, ctx); + } + + pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option { + if let Some(round_conclusion) = self.consensus.handle_new_sync_control_message(message, ctx) { + return Some(self.enter_non_sync_mode(round_conclusion, ctx)); + } + + return None; + } + + // --- Running code + + pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + // Check if we have any branch that needs running + debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); + let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); + if branch_id.is_none() { + return ConnectorScheduling::NotNow; + } + + // Retrieve the branch and run it + let branch_id = branch_id.unwrap(); + let branch = &mut self.tree[branch_id]; + + let mut run_context = ConnectorRunContext{ + branch_id, + consensus: &self.consensus, + prepared: branch.prepared.take(), + }; + + let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); + if let Err(eval_error) = run_result { + self.eval_error = Some(eval_error); + self.mode = Mode::SyncError; + if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) { + // We can exit immediately + return self.enter_non_sync_mode(conclusion, comp_ctx); + } else { + // Current branch failed. But we may have other things that are + // running. + return ConnectorScheduling::Immediate; + } + } + let run_result = run_result.unwrap(); + + // Handle the returned result. Note that this match statement contains + // explicit returns in case the run result requires that the component's + // code is ran again immediately + match run_result { + EvalContinuation::BranchInconsistent => { + // Branch became inconsistent + branch.sync_state = SpeculativeState::Inconsistent; + }, + EvalContinuation::BlockFires(port_id) => { + // Branch called `fires()` on a port that has not been used yet. + let port_id = PortIdLocal::new(port_id.id); + + // Create two forks, one that assumes the port will fire, and + // one that assumes the port remains silent + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + + let firing_branch_id = self.tree.fork_branch(branch_id); + let silent_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, firing_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true, comp_ctx); + debug_assert_eq!(_result, Consistency::Valid); + self.consensus.notify_of_new_branch(branch_id, silent_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false, comp_ctx); + debug_assert_eq!(_result, Consistency::Valid); + + // Somewhat important: we push the firing one first, such that + // that branch is ran again immediately. + self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); + self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); + + return ConnectorScheduling::Immediate; + }, + EvalContinuation::BlockGet(port_id) => { + // Branch performed a `get()` on a port that does not have a + // received message on that port. + let port_id = PortIdLocal::new(port_id.id); + + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + // Note: we only know that a branch is waiting on a message when + // it reaches the `get` call. But we might have already received + // a message that targets this branch, so check now. + let mut any_message_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + branch.awaiting_port = PortIdLocal::new_invalid(); + branch.prepared = PreparedStatement::PerformedGet(message.content.clone()); + + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + + any_message_received = true; + } + } + + if any_message_received { + return ConnectorScheduling::Immediate; + } + } + EvalContinuation::SyncBlockEnd => { + let consistency = self.consensus.notify_of_finished_branch(branch_id); + if consistency == Consistency::Valid { + branch.sync_state = SpeculativeState::ReachedSyncEnd; + self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + }, + EvalContinuation::NewFork => { + // Like the `NewChannel` result. This means we're setting up + // a branch and putting a marker inside the RunContext for the + // next time we run the PDL code + let left_id = branch_id; + let right_id = self.tree.fork_branch(left_id); + self.consensus.notify_of_new_branch(left_id, right_id); + self.tree.push_into_queue(QueueKind::Runnable, left_id); + self.tree.push_into_queue(QueueKind::Runnable, right_id); + + let left_branch = &mut self.tree[left_id]; + left_branch.prepared = PreparedStatement::ForkedExecution(true); + let right_branch = &mut self.tree[right_id]; + right_branch.prepared = PreparedStatement::ForkedExecution(false); + } + EvalContinuation::Put(port_id, content) => { + // Branch is attempting to send data + let port_id = PortIdLocal::new(port_id.id); + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + let message = DataMessage{ sync_header, data_header, content }; + match comp_ctx.submit_message(Message::Data(message)) { + Ok(_) => { + // Message is underway + branch.prepared = PreparedStatement::PerformedPut; + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; + }, + Err(_) => { + // We don't own the port + let pd = &sched_ctx.runtime.protocol_description; + let eval_error = branch.code_state.new_error_at_expr( + &pd.modules, &pd.heap, + String::from("attempted to 'put' on port that is no longer owned") + ); + self.eval_error = Some(eval_error); + self.mode = Mode::SyncError; + + println!("DEBUGERINO: Notify of fatal branch"); + if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) { + println!("DEBUGERINO: Actually got {:?}", conclusion); + return self.enter_non_sync_mode(conclusion, comp_ctx); + } + } + } + }, + _ => unreachable!("unexpected run result {:?} in sync mode", run_result), + } + + // If here then the run result did not require a particular action. We + // return whether we have more active branches to run or not. + if self.tree.queue_is_empty(QueueKind::Runnable) { + return ConnectorScheduling::NotNow; + } else { + return ConnectorScheduling::Later; + } + } + + pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); + + let branch = self.tree.base_branch_mut(); + debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); + + let mut run_context = ConnectorRunContext{ + branch_id: branch.id, + consensus: &self.consensus, + prepared: branch.prepared.take(), + }; + let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); + if let Err(eval_error) = run_result { + comp_ctx.push_error(eval_error); + return ConnectorScheduling::Exit + } + let run_result = run_result.unwrap(); + + match run_result { + EvalContinuation::ComponentTerminated => { + branch.sync_state = SpeculativeState::Finished; + return ConnectorScheduling::Exit; + }, + EvalContinuation::SyncBlockStart => { + comp_ctx.notify_sync_start(); + let sync_branch_id = self.tree.start_sync(); + debug_assert!(self.last_finished_handled.is_none()); + self.consensus.start_sync(comp_ctx); + self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); + self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); + self.mode = Mode::Sync; + + return ConnectorScheduling::Immediate; + }, + EvalContinuation::NewComponent(definition_id, monomorph_idx, arguments) => { + // Note: we're relinquishing ownership of ports. But because + // we are in non-sync mode the scheduler will handle and check + // port ownership transfer. + debug_assert!(comp_ctx.workspace_ports.is_empty()); + find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); + + let new_prompt = Prompt::new( + &sched_ctx.runtime.protocol_description.types, + &sched_ctx.runtime.protocol_description.heap, + definition_id, monomorph_idx, arguments + ); + let new_component = ConnectorPDL::new(new_prompt); + comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); + comp_ctx.workspace_ports.clear(); + + return ConnectorScheduling::Later; + }, + EvalContinuation::NewChannel => { + let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); + debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); + branch.prepared = PreparedStatement::CreatedChannel(( + Value::Output(PortId::new(putter.self_id.index)), + Value::Input(PortId::new(getter.self_id.index)), + )); + + comp_ctx.push_port(putter); + comp_ctx.push_port(getter); + + return ConnectorScheduling::Immediate; + }, + _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), + } + } + + /// Helper that moves the component's state back into non-sync mode, using + /// the provided solution branch ID as the branch that should be comitted to + /// memory. If this function returns false, then the component is supposed + /// to exit. + fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling { + debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError); + + // Depending on local state decide what to do + let final_branch_id = match conclusion { + RoundConclusion::Success(branch_id) => Some(branch_id), + RoundConclusion::Failure => None, + }; + + if let Some(solution_branch_id) = final_branch_id { + let mut fake_vec = Vec::new(); + self.tree.end_sync(solution_branch_id); + self.consensus.end_sync(solution_branch_id, &mut fake_vec); + debug_assert!(fake_vec.is_empty()); + + ctx.notify_sync_end(&[]); + self.last_finished_handled = None; + self.eval_error = None; // in case we came from the SyncError mode + self.mode = Mode::NonSync; + + return ConnectorScheduling::Immediate; + } else { + // No final branch, because we're supposed to exit! + self.last_finished_handled = None; + self.mode = Mode::Error; + if let Some(eval_error) = self.eval_error.take() { + ctx.push_error(eval_error); + } + + return ConnectorScheduling::Exit; + } + } + + /// Runs the prompt repeatedly until some kind of execution-blocking + /// condition appears. + #[inline] + fn run_prompt(prompt: &mut Prompt, pd: &ProtocolDescription, ctx: &mut ConnectorRunContext) -> Result { + loop { + let result = prompt.step(&pd.types, &pd.heap, &pd.modules, ctx); + if let Ok(EvalContinuation::Stepping) = result { + continue; + } + + return result; + } + } +} \ No newline at end of file