use std::collections::HashMap; /// connector.rs /// /// Represents a component. A component (and the scheduler that is running it) /// has many properties that are not easy to subdivide into aspects that are /// conceptually handled by particular data structures. That is to say: the code /// that we run governs: running PDL code, keeping track of ports, instantiating /// new components and transports (i.e. interacting with the runtime), running /// a consensus algorithm, etc. But on the other hand, our data is rather /// simple: we have a speculative execution tree, a set of ports that we own, /// and a bit of code that we should run. /// /// So currently the code is organized as following: /// - The scheduler that is running the component is the authoritative source on /// ports during *non-sync* mode. The consensus algorithm is the /// authoritative source during *sync* mode. They retrieve each other's /// state during the transitions. Hence port data exists duplicated between /// these two datastructures. /// - The execution tree is where executed branches reside. But the execution /// tree is only aware of the tree shape itself (and keeps track of some /// queues of branches that are in a particular state), and tends to store /// the PDL program state. The consensus algorithm is also somewhat aware /// of the execution tree, but only in terms of what is needed to complete /// a sync round (for now, that means the port mapping in each branch). /// Hence once more we have properties conceptually associated with branches /// in two places. /// - TODO: Write about handling messages, consensus wrapping data /// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx use std::sync::atomic::AtomicBool; use crate::PortId; use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; use crate::runtime2::consensus::find_ports_in_value_group; use crate::runtime2::port::PortKind; use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency}; use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; use super::inbox::PublicInbox; use super::native::Connector; use super::port::PortIdLocal; use super::scheduler::{ComponentCtxFancy, SchedulerCtx}; pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, pub sleeping: AtomicBool, } impl ConnectorPublic { pub fn new(initialize_as_sleeping: bool) -> Self { ConnectorPublic{ inbox: PublicInbox::new(), sleeping: AtomicBool::new(initialize_as_sleeping), } } } #[derive(Eq, PartialEq)] pub(crate) enum ConnectorScheduling { Immediate, // Run again, immediately Later, // Schedule for running, at some later point in time NotNow, // Do not reschedule for running Exit, // Connector has exited } pub(crate) struct ConnectorPDL { tree: ExecTree, consensus: Consensus, } struct ConnectorRunContext<'a> { branch_id: BranchId, consensus: &'a Consensus, received: &'a HashMap, scheduler: SchedulerCtx<'a>, prepared_channel: Option<(Value, Value)>, } impl RunContext for ConnectorRunContext{ fn did_put(&mut self, port: PortId) -> bool { let port_id = PortIdLocal::new(port.0.u32_suffix); let annotation = self.consensus.get_annotation(self.branch_id, port_id); return annotation.registered_id.is_some(); } fn get(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); match self.received.get(&port_id) { Some(data) => Some(data.clone()), None => None, } } fn fires(&mut self, port: PortId) -> Option { let port_id = PortIdLocal::new(port.0.u32_suffix); let annotation = self.consensus.get_annotation(self.branch_id, port_id); return annotation.expected_firing.map(|v| Value::Bool(v)); } fn get_channel(&mut self) -> Option<(Value, Value)> { return self.prepared_channel.take(); } } impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); if self.tree.is_in_sync() { let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); self.consensus.handle_new_finished_sync_branches(); return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; } } } impl ConnectorPDL { pub fn new(initial: ComponentState) -> Self { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), } } // --- Handling messages pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) { while let Some(message) = ctx.read_next_message() { match message { MessageFancy::Data(message) => handle_new_data_message(message, ctx), MessageFancy::Sync(message) => handle_new_sync_message(message, ctx), MessageFancy::Control(_) => unreachable!("control message in component"), } } } pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. debug_assert!(ctx.workspace_branches.is_empty()); self.consensus.handle_received_sync_header(&message.sync_header, ctx); self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut ctx.workspace_branches); for branch_id in ctx.workspace_branches.drain(..) { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.clone()); self.consensus.notify_of_received_message(branch_id, &message.data_header, &message.content); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { self.consensus.handle_received_sync_header(&message.sync_header, ctx); todo!("handle content of message?"); } // --- Running code pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { // Check if we have any branch that needs running debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); if branch_id.is_none() { return ConnectorScheduling::NotNow; } // Retrieve the branch and run it let branch_id = branch_id.unwrap(); let branch = &mut self.tree[branch_id]; let mut run_context = ConnectorRunContext{ branch_id, consensus: &self.consensus, received: &branch.inbox, scheduler: *sched_ctx, prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); // Handle the returned result. Note that this match statement contains // explicit returns in case the run result requires that the component's // code is ran again immediately match run_result { RunResult::BranchInconsistent => { // Branch became inconsistent branch.sync_state = SpeculativeState::Inconsistent; }, RunResult::BranchMissingPortState(port_id) => { // Branch called `fires()` on a port that has not been used yet. let port_id = PortIdLocal::new(port_id.0.u32_suffix); // Create two forks, one that assumes the port will fire, and // one that assumes the port remains silent branch.sync_state = SpeculativeState::HaltedAtBranchPoint; let firing_branch_id = self.tree.fork_branch(branch_id); let silent_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, firing_branch_id); let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); debug_assert_eq!(_result, Consistency::Valid); self.consensus.notify_of_new_branch(branch_id, silent_branch_id); let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); debug_assert_eq!(_result, Consistency::Valid); // Somewhat important: we push the firing one first, such that // that branch is ran again immediately. self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); return ConnectorScheduling::Immediate; }, RunResult::BranchMissingPortValue(port_id) => { // Branch performed a `get()` on a port that does not have a // received message on that port. let port_id = PortIdLocal::new(port_id.0.u32_suffix); let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { // `get()` is valid, so mark the branch as awaiting a message branch.sync_state = SpeculativeState::HaltedAtBranchPoint; branch.awaiting_port = port_id; self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); // Note: we only know that a branch is waiting on a message when // it reaches the `get` call. But we might have already received // a message that targets this branch, so check now. let mut any_branch_received = false; for message in comp_ctx.get_read_data_messages(port_id) { if self.consensus.branch_can_receive(branch_id, &message.data_header) { // This branch can receive the message, so we do the // fork-and-receive dance let recv_branch_id = self.tree.fork_branch(branch_id); let branch = &mut self.tree[recv_branch_id]; branch.insert_message(port_id, message.content.clone()); self.consensus.notify_of_new_branch(branch_id, recv_branch_id); self.consensus.notify_of_received_message(recv_branch_id, &message.data_header, &message.content); self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id); any_branch_received = true; } } if any_branch_received { return ConnectorScheduling::Immediate; } } else { branch.sync_state = SpeculativeState::Inconsistent; } } RunResult::BranchAtSyncEnd => { let consistency = self.consensus.notify_of_finished_branch(branch_id); if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); } else if consistency == Consistency::Inconsistent { branch.sync_state == SpeculativeState::Inconsistent; } }, RunResult::BranchPut(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { // `put()` is valid. let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{ sync_header, data_header, content })); self.tree.push_into_queue(QueueKind::Runnable, branch_id); return ConnectorScheduling::Immediate; } else { branch.sync_state = SpeculativeState::Inconsistent; } }, _ => unreachable!("unexpected run result {:?} in sync mode", run_result), } // If here then the run result did not require a particular action. We // return whether we have more active branches to run or not. if self.tree.queue_is_empty(QueueKind::Runnable) { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; } } pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); let branch = self.tree.base_branch_mut(); debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ branch_id, consensus: &self.consensus, received: &branch.inbox, scheduler: *sched_ctx, prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); match run_result { RunResult::ComponentTerminated => { branch.sync_state = SpeculativeState::Finished; return ConnectorScheduling::Exit; }, RunResult::ComponentAtSyncStart => { let current_ports = comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); self.consensus.start_sync(current_ports); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); return ConnectorScheduling::Immediate; }, RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { // Note: we're relinquishing ownership of ports. But because // we are in non-sync mode the scheduler will handle and check // port ownership transfer. debug_assert!(comp_ctx.workspace_ports.is_empty()); find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); let new_state = ComponentState { prompt: Prompt::new( &sched_ctx.runtime.protocol_description.types, &sched_ctx.runtime.protocol_description.heap, definition_id, monomorph_idx, arguments ), }; let new_component = ConnectorPDL::new(new_state); comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); comp_ctx.workspace_ports.clear(); return ConnectorScheduling::Later; }, RunResult::NewChannel => { let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); branch.prepared_channel = Some(( Value::Input(PortId::new(putter.self_id.index)), Value::Output(PortId::new(getter.self_id.index)), )); comp_ctx.push_port(putter); comp_ctx.push_port(getter); return ConnectorScheduling::Immediate; }, _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), } } }