// connector.rs // // Represents a component. A component (and the scheduler that is running it) // has many properties that are not easy to subdivide into aspects that are // conceptually handled by particular data structures. That is to say: the code // that we run governs: running PDL code, keeping track of ports, instantiating // new components and transports (i.e. interacting with the runtime), running // a consensus algorithm, etc. But on the other hand, our data is rather // simple: we have a speculative execution tree, a set of ports that we own, // and a bit of code that we should run. // // So currently the code is organized as following: // - The scheduler that is running the component is the authoritative source on // ports during *non-sync* mode. The consensus algorithm is the // authoritative source during *sync* mode. They retrieve each other's // state during the transitions. Hence port data exists duplicated between // these two datastructures. // - The execution tree is where executed branches reside. But the execution // tree is only aware of the tree shape itself (and keeps track of some // queues of branches that are in a particular state), and tends to store // the PDL program state. The consensus algorithm is also somewhat aware // of the execution tree, but only in terms of what is needed to complete // a sync round (for now, that means the port mapping in each branch). // Hence once more we have properties conceptually associated with branches // in two places. // - TODO: Write about handling messages, consensus wrapping data // - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx use std::sync::atomic::AtomicBool; use crate::ProtocolDescription; use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, PortId, ValueGroup}; use crate::protocol::RunContext; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement}; use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group}; use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; use super::scheduler::{ComponentCtx, SchedulerCtx, MessageTicket}; pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, pub sleeping: AtomicBool, } impl ConnectorPublic { pub fn new(initialize_as_sleeping: bool) -> Self { ConnectorPublic{ inbox: PublicInbox::new(), sleeping: AtomicBool::new(initialize_as_sleeping), } } } #[derive(Debug, PartialEq, Eq, Clone, Copy)] enum Mode { NonSync, // running non-sync code Sync, // running sync code (in potentially multiple branches) SyncError, // encountered an unrecoverable error in sync mode Error, // encountered an error in non-sync mode (or finished handling the sync mode error). } #[derive(Debug)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time NotNow, // Do not reschedule for running Exit, // Connector has exited } pub(crate) struct ConnectorPDL { mode: Mode, eval_error: Option, tree: ExecTree, consensus: Consensus, last_finished_handled: Option, } struct ConnectorRunContext<'a> { branch_id: BranchId, consensus: &'a Consensus, prepared: PreparedStatement, } impl<'a> RunContext for ConnectorRunContext<'a>{ fn performed_put(&mut self, _port: PortId) -> bool { return match self.prepared.take() { PreparedStatement::None => false, PreparedStatement::PerformedPut => true, taken => unreachable!("prepared statement is '{:?}' during 'performed_put()'", taken) }; } fn performed_get(&mut self, _port: PortId) -> Option { return match self.prepared.take() { PreparedStatement::None => None, PreparedStatement::PerformedGet(value) => Some(value), taken => unreachable!("prepared statement is '{:?}' during 'performed_get()'", taken), }; } fn fires(&mut self, _port: PortId) -> Option { todo!("Remove fires() now") // let port_id = PortIdLocal::new(port.id); // let annotation = self.consensus.get_annotation(self.branch_id, port_id); // return annotation.expected_firing.map(|v| Value::Bool(v)); } fn created_channel(&mut self) -> Option<(Value, Value)> { return match self.prepared.take() { PreparedStatement::None => None, PreparedStatement::CreatedChannel(ports) => Some(ports), taken => unreachable!("prepared statement is '{:?}' during 'created_channel()'", taken), }; } fn performed_fork(&mut self) -> Option { return match self.prepared.take() { PreparedStatement::None => None, PreparedStatement::ForkedExecution(path) => Some(path), taken => unreachable!("prepared statement is '{:?}' during 'performed_fork()'", taken), }; } fn performed_select_wait(&mut self) -> Option { unreachable!() } } impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { if let Some(scheduling) = self.handle_new_messages(comp_ctx) { return scheduling; } match self.mode { Mode::Sync => { // Run in sync mode let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); // Handle any new finished branches let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); while let Some(branch_id) = iter_id { iter_id = self.tree.get_queue_next(branch_id); self.last_finished_handled = Some(branch_id); if let Some(round_conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { // Actually found a solution return self.enter_non_sync_mode(round_conclusion, comp_ctx); } self.last_finished_handled = Some(branch_id); } return scheduling; }, Mode::NonSync => { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; }, Mode::SyncError => { let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); return scheduling; }, Mode::Error => { // This shouldn't really be called. Because when we reach exit // mode the scheduler should not run the component anymore unreachable!("called component run() during error-mode"); }, } } } impl ConnectorPDL { pub fn new(initial: Prompt) -> Self { Self{ mode: Mode::NonSync, eval_error: None, tree: ExecTree::new(initial), consensus: Consensus::new(), last_finished_handled: None, } } // --- Handling messages pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option { while let Some(ticket) = ctx.get_next_message_ticket() { let message = ctx.read_message_using_ticket(ticket); let immediate_result = if let Message::Data(_) = message { self.handle_new_data_message(ticket, ctx); None } else { match ctx.take_message_using_ticket(ticket) { Message::Data(_) => unreachable!(), Message::SyncComp(message) => { self.handle_new_sync_comp_message(message, ctx) }, Message::SyncPort(message) => { self.handle_new_sync_port_message(message, ctx); None }, Message::SyncControl(message) => { self.handle_new_sync_control_message(message, ctx) }, Message::Control(_) => unreachable!("control message in component"), } }; if let Some(result) = immediate_result { return Some(result); } } return None; } pub fn handle_new_data_message(&mut self, ticket: MessageTicket, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. if !self.consensus.handle_new_data_message(ticket, ctx) { // Message should not be handled now return; } let message = ctx.read_message_using_ticket(ticket).as_data(); let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); while let Some(branch_id) = iter_id { iter_id = self.tree.get_queue_next(branch_id); let branch = &self.tree[branch_id]; if branch.awaiting_port != message.data_header.target_port { continue; } if !self.consensus.branch_can_receive(branch_id, &message) { continue; } // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port); receiving_branch.awaiting_port = PortIdLocal::new_invalid(); receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.clone()); self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) { return Some(self.enter_non_sync_mode(round_conclusion, ctx)); } return None; } pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { self.consensus.handle_new_sync_port_message(message, ctx); } pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option { if let Some(round_conclusion) = self.consensus.handle_new_sync_control_message(message, ctx) { return Some(self.enter_non_sync_mode(round_conclusion, ctx)); } return None; } // --- Running code pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { // Check if we have any branch that needs running debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); if branch_id.is_none() { return ConnectorScheduling::NotNow; } // Retrieve the branch and run it let branch_id = branch_id.unwrap(); let branch = &mut self.tree[branch_id]; let mut run_context = ConnectorRunContext{ branch_id, consensus: &self.consensus, prepared: branch.prepared.take(), }; let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); if let Err(eval_error) = run_result { self.eval_error = Some(eval_error); self.mode = Mode::SyncError; if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) { // We can exit immediately return self.enter_non_sync_mode(conclusion, comp_ctx); } else { // Current branch failed. But we may have other things that are // running. return ConnectorScheduling::Immediate; } } let run_result = run_result.unwrap(); // Handle the returned result. Note that this match statement contains // explicit returns in case the run result requires that the component's // code is ran again immediately match run_result { EvalContinuation::BranchInconsistent => { // Branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, EvalContinuation::BlockFires(port_id) => { // Branch called `fires()` on a port that has not been used yet. let port_id = PortIdLocal::new(port_id.id); // Create two forks, one that assumes the port will fire, and // one that assumes the port remains silent branch.sync_state = SpeculativeState::HaltedAtBranchPoint; let firing_branch_id = self.tree.fork_branch(branch_id); let silent_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, firing_branch_id); let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true, comp_ctx); debug_assert_eq!(_result, Consistency::Valid); self.consensus.notify_of_new_branch(branch_id, silent_branch_id); let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false, comp_ctx); debug_assert_eq!(_result, Consistency::Valid); // Somewhat important: we push the firing one first, such that // that branch is ran again immediately. self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); return ConnectorScheduling::Immediate; }, EvalContinuation::BlockGet(_expr_id, port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.id); branch.sync_state = SpeculativeState::HaltedAtBranchPoint; branch.awaiting_port = port_id; self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); // Note: we only know that a branch is waiting on a message when // it reaches the `get` call. But we might have already received // a message that targets this branch, so check now. let mut any_message_received = false; for message in comp_ctx.get_read_data_messages(port_id) { if self.consensus.branch_can_receive(branch_id, &message) { // This branch can receive the message, so we do the // fork-and-receive dance let receiving_branch_id = self.tree.fork_branch(branch_id); let branch = &mut self.tree[receiving_branch_id]; branch.awaiting_port = PortIdLocal::new_invalid(); branch.prepared = PreparedStatement::PerformedGet(message.content.clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; } } if any_message_received { return ConnectorScheduling::Immediate; } } EvalContinuation::SyncBlockEnd => { let consistency = self.consensus.notify_of_finished_branch(branch_id); if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); } else { branch.sync_state = SpeculativeState::Inconsistent; } }, EvalContinuation::NewFork => { // Like the `NewChannel` result. This means we're setting up // a branch and putting a marker inside the RunContext for the // next time we run the PDL code let left_id = branch_id; let right_id = self.tree.fork_branch(left_id); self.consensus.notify_of_new_branch(left_id, right_id); self.tree.push_into_queue(QueueKind::Runnable, left_id); self.tree.push_into_queue(QueueKind::Runnable, right_id); let left_branch = &mut self.tree[left_id]; left_branch.prepared = PreparedStatement::ForkedExecution(true); let right_branch = &mut self.tree[right_id]; right_branch.prepared = PreparedStatement::ForkedExecution(false); } EvalContinuation::Put(_expr_id, port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.id); let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); let message = DataMessage{ sync_header, data_header, content }; match comp_ctx.submit_message(Message::Data(message)) { Ok(_) => { // Message is underway branch.prepared = PreparedStatement::PerformedPut; self.tree.push_into_queue(QueueKind::Runnable, branch_id); return ConnectorScheduling::Immediate; }, Err(_) => { // We don't own the port let pd = &sched_ctx.runtime.protocol_description; let eval_error = branch.code_state.new_error_at_expr( &pd.modules, &pd.heap, String::from("attempted to 'put' on port that is no longer owned") ); self.eval_error = Some(eval_error); self.mode = Mode::SyncError; if let Some(conclusion) = self.consensus.notify_of_fatal_branch(branch_id, comp_ctx) { return self.enter_non_sync_mode(conclusion, comp_ctx); } } } }, _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } // If here then the run result did not require a particular action. We // return whether we have more active branches to run or not. if self.tree.queue_is_empty(QueueKind::Runnable) { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; } } pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); let branch = self.tree.base_branch_mut(); debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ branch_id: branch.id, consensus: &self.consensus, prepared: branch.prepared.take(), }; let run_result = Self::run_prompt(&mut branch.code_state, &sched_ctx.runtime.protocol_description, &mut run_context); if let Err(eval_error) = run_result { comp_ctx.push_error(eval_error); return ConnectorScheduling::Exit } let run_result = run_result.unwrap(); match run_result { EvalContinuation::ComponentTerminated => { branch.sync_state = SpeculativeState::Finished; return ConnectorScheduling::Exit; }, EvalContinuation::SyncBlockStart => { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); debug_assert!(self.last_finished_handled.is_none()); self.consensus.start_sync(comp_ctx); self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); self.mode = Mode::Sync; return ConnectorScheduling::Immediate; }, EvalContinuation::NewComponent(definition_id, type_id, arguments) => { // Note: we're relinquishing ownership of ports. But because // we are in non-sync mode the scheduler will handle and check // port ownership transfer. debug_assert!(comp_ctx.workspace_ports.is_empty()); find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); let new_prompt = Prompt::new( &sched_ctx.runtime.protocol_description.types, &sched_ctx.runtime.protocol_description.heap, definition_id, type_id, arguments ); let new_component = ConnectorPDL::new(new_prompt); comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); comp_ctx.workspace_ports.clear(); return ConnectorScheduling::Later; }, EvalContinuation::NewChannel => { let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); branch.prepared = PreparedStatement::CreatedChannel(( Value::Output(PortId::new(putter.self_id.index)), Value::Input(PortId::new(getter.self_id.index)), )); comp_ctx.push_port(putter); comp_ctx.push_port(getter); return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), } } /// Helper that moves the component's state back into non-sync mode, using /// the provided solution branch ID as the branch that should be comitted to /// memory. If this function returns false, then the component is supposed /// to exit. fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling { debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError); // Depending on local state decide what to do let final_branch_id = match conclusion { RoundConclusion::Success(branch_id) => Some(branch_id), RoundConclusion::Failure => None, }; if let Some(solution_branch_id) = final_branch_id { let mut fake_vec = Vec::new(); self.tree.end_sync(solution_branch_id); self.consensus.end_sync(solution_branch_id, &mut fake_vec); debug_assert!(fake_vec.is_empty()); ctx.notify_sync_end(&[]); self.last_finished_handled = None; self.eval_error = None; // in case we came from the SyncError mode self.mode = Mode::NonSync; return ConnectorScheduling::Immediate; } else { // No final branch, because we're supposed to exit! self.last_finished_handled = None; self.mode = Mode::Error; if let Some(eval_error) = self.eval_error.take() { ctx.push_error(eval_error); } return ConnectorScheduling::Exit; } } /// Runs the prompt repeatedly until some kind of execution-blocking /// condition appears. #[inline] fn run_prompt(prompt: &mut Prompt, pd: &ProtocolDescription, ctx: &mut ConnectorRunContext) -> Result { loop { let result = prompt.step(&pd.types, &pd.heap, &pd.modules, ctx); if let Ok(EvalContinuation::Stepping) = result { continue; } return result; } } }