// connector.rs // // Represents a component. A component (and the scheduler that is running it) // has many properties that are not easy to subdivide into aspects that are // conceptually handled by particular data structures. That is to say: the code // that we run governs: running PDL code, keeping track of ports, instantiating // new components and transports (i.e. interacting with the runtime), running // a consensus algorithm, etc. But on the other hand, our data is rather // simple: we have a speculative execution tree, a set of ports that we own, // and a bit of code that we should run. // // So currently the code is organized as following: // - The scheduler that is running the component is the authoritative source on // ports during *non-sync* mode. The consensus algorithm is the // authoritative source during *sync* mode. They retrieve each other's // state during the transitions. Hence port data exists duplicated between // these two datastructures. // - The execution tree is where executed branches reside. But the execution // tree is only aware of the tree shape itself (and keeps track of some // queues of branches that are in a particular state), and tends to store // the PDL program state. The consensus algorithm is also somewhat aware // of the execution tree, but only in terms of what is needed to complete // a sync round (for now, that means the port mapping in each branch). // Hence once more we have properties conceptually associated with branches // in two places. // - TODO: Write about handling messages, consensus wrapping data // - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx use std::collections::HashMap; use std::sync::atomic::AtomicBool; use crate::PortId; use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; use super::scheduler::{ComponentCtx, SchedulerCtx}; pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, pub sleeping: AtomicBool, } impl ConnectorPublic { pub fn new(initialize_as_sleeping: bool) -> Self { ConnectorPublic{ inbox: PublicInbox::new(), sleeping: AtomicBool::new(initialize_as_sleeping), } } } #[derive(Eq, PartialEq)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time NotNow, // Do not reschedule for running Exit, // Connector has exited } pub(crate) struct ConnectorPDL { tree: ExecTree, consensus: Consensus, } struct ConnectorRunContext<'a> { branch_id: BranchId, consensus: &'a Consensus, received: &'a HashMap, scheduler: SchedulerCtx<'a>, prepared_channel: Option<(Value, Value)>, } impl<'a> RunContext for ConnectorRunContext<'a>{ fn did_put(&mut self, port: PortId) -> bool { let port_id = PortIdLocal::new(port.0.u32_suffix); let annotation = self.consensus.get_annotation(self.branch_id, port_id); return annotation.registered_id.is_some(); } fn get(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); match self.received.get(&port_id) { Some(data) => Some(data.clone()), None => None, } } fn fires(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); let annotation = self.consensus.get_annotation(self.branch_id, port_id); return annotation.expected_firing.map(|v| Value::Bool(v)); } fn get_channel(&mut self) -> Option<(Value, Value)> { return self.prepared_channel.take(); } } impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); if self.tree.is_in_sync() { let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); return ConnectorScheduling::Immediate; } else { return scheduling } } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; } } } impl ConnectorPDL { pub fn new(initial: ComponentState) -> Self { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), } } // --- Handling messages pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) { while let Some(message) = ctx.read_next_message() { match message { Message::Data(message) => self.handle_new_data_message(message, ctx), Message::Sync(message) => self.handle_new_sync_message(message, ctx), Message::Control(_) => unreachable!("control message in component"), } } } pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. debug_assert!(ctx.workspace_branches.is_empty()); let mut branches = Vec::new(); // TODO: @Remove self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches); for branch_id in branches.drain(..) { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { self.collapse_sync_to_solution_branch(solution_branch_id, ctx); } } // --- Running code pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { // Check if we have any branch that needs running debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); if branch_id.is_none() { return ConnectorScheduling::NotNow; } // Retrieve the branch and run it let branch_id = branch_id.unwrap(); let branch = &mut self.tree[branch_id]; let mut run_context = ConnectorRunContext{ branch_id, consensus: &self.consensus, received: &branch.inbox, scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); // Handle the returned result. Note that this match statement contains // explicit returns in case the run result requires that the component's // code is ran again immediately match run_result { RunResult::BranchInconsistent => { // Branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, RunResult::BranchMissingPortState(port_id) => { // Branch called `fires()` on a port that has not been used yet. let port_id = PortIdLocal::new(port_id.0.u32_suffix); // Create two forks, one that assumes the port will fire, and // one that assumes the port remains silent branch.sync_state = SpeculativeState::HaltedAtBranchPoint; let firing_branch_id = self.tree.fork_branch(branch_id); let silent_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, firing_branch_id); let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); debug_assert_eq!(_result, Consistency::Valid); self.consensus.notify_of_new_branch(branch_id, silent_branch_id); let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); debug_assert_eq!(_result, Consistency::Valid); // Somewhat important: we push the firing one first, such that // that branch is ran again immediately. self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); return ConnectorScheduling::Immediate; }, RunResult::BranchMissingPortValue(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.0.u32_suffix); let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { // `get()` is valid, so mark the branch as awaiting a message branch.sync_state = SpeculativeState::HaltedAtBranchPoint; branch.awaiting_port = port_id; self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); // Note: we only know that a branch is waiting on a message when // it reaches the `get` call. But we might have already received // a message that targets this branch, so check now. let mut any_branch_received = false; for message in comp_ctx.get_read_data_messages(port_id) { if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) { // This branch can receive the message, so we do the // fork-and-receive dance let receiving_branch_id = self.tree.fork_branch(branch_id); let branch = &mut self.tree[receiving_branch_id]; branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_branch_received = true; } } if any_branch_received { return ConnectorScheduling::Immediate; } } else { branch.sync_state = SpeculativeState::Inconsistent; } } RunResult::BranchAtSyncEnd => { let consistency = self.consensus.notify_of_finished_branch(branch_id); if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); } else if consistency == Consistency::Inconsistent { branch.sync_state = SpeculativeState::Inconsistent; } }, RunResult::BranchPut(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { // `put()` is valid. let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); comp_ctx.submit_message(Message::Data(DataMessage { sync_header, data_header, content: DataContent::Message(content), })); self.tree.push_into_queue(QueueKind::Runnable, branch_id); return ConnectorScheduling::Immediate; } else { branch.sync_state = SpeculativeState::Inconsistent; } }, _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } // If here then the run result did not require a particular action. We // return whether we have more active branches to run or not. if self.tree.queue_is_empty(QueueKind::Runnable) { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; } } pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); let branch = self.tree.base_branch_mut(); debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ branch_id: branch.id, consensus: &self.consensus, received: &branch.inbox, scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); match run_result { RunResult::ComponentTerminated => { branch.sync_state = SpeculativeState::Finished; return ConnectorScheduling::Exit; }, RunResult::ComponentAtSyncStart => { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); self.consensus.start_sync(comp_ctx); self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); return ConnectorScheduling::Immediate; }, RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Note: we're relinquishing ownership of ports. But because // we are in non-sync mode the scheduler will handle and check // port ownership transfer. debug_assert!(comp_ctx.workspace_ports.is_empty()); find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); let new_state = ComponentState { prompt: Prompt::new( &sched_ctx.runtime.protocol_description.types, &sched_ctx.runtime.protocol_description.heap, definition_id, monomorph_idx, arguments ), }; let new_component = ConnectorPDL::new(new_state); comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); comp_ctx.workspace_ports.clear(); return ConnectorScheduling::Later; }, RunResult::NewChannel => { let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); branch.prepared_channel = Some(( Value::Output(PortId::new(putter.self_id.index)), Value::Input(PortId::new(getter.self_id.index)), )); comp_ctx.push_port(putter); comp_ctx.push_port(getter); return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), } } pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) { let mut fake_vec = Vec::new(); self.tree.end_sync(solution_branch_id); self.consensus.end_sync(solution_branch_id, &mut fake_vec); for port in fake_vec { // TODO: Handle sent/received ports debug_assert!(ctx.get_port_by_id(port).is_some()); } ctx.notify_sync_end(&[]); } }