diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs deleted file mode 100644 index f33e6c5c2bb5bdbcfa4949ebb9fe7ee0c7de128c..0000000000000000000000000000000000000000 --- a/src/runtime2/connector2.rs +++ /dev/null @@ -1,385 +0,0 @@ -use std::collections::HashMap; -/// connector.rs -/// -/// Represents a component. A component (and the scheduler that is running it) -/// has many properties that are not easy to subdivide into aspects that are -/// conceptually handled by particular data structures. That is to say: the code -/// that we run governs: running PDL code, keeping track of ports, instantiating -/// new components and transports (i.e. interacting with the runtime), running -/// a consensus algorithm, etc. But on the other hand, our data is rather -/// simple: we have a speculative execution tree, a set of ports that we own, -/// and a bit of code that we should run. -/// -/// So currently the code is organized as following: -/// - The scheduler that is running the component is the authoritative source on -/// ports during *non-sync* mode. The consensus algorithm is the -/// authoritative source during *sync* mode. They retrieve each other's -/// state during the transitions. Hence port data exists duplicated between -/// these two datastructures. -/// - The execution tree is where executed branches reside. But the execution -/// tree is only aware of the tree shape itself (and keeps track of some -/// queues of branches that are in a particular state), and tends to store -/// the PDL program state. The consensus algorithm is also somewhat aware -/// of the execution tree, but only in terms of what is needed to complete -/// a sync round (for now, that means the port mapping in each branch). -/// Hence once more we have properties conceptually associated with branches -/// in two places. -/// - TODO: Write about handling messages, consensus wrapping data -/// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx - -use std::sync::atomic::AtomicBool; - -use crate::PortId; -use crate::common::ComponentState; -use crate::protocol::eval::{Prompt, Value, ValueGroup}; -use crate::protocol::{RunContext, RunResult}; -use crate::runtime2::consensus::find_ports_in_value_group; -use crate::runtime2::inbox2::DataContent; -use crate::runtime2::port::PortKind; - -use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; -use super::consensus::{Consensus, Consistency}; -use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox}; -use super::native::Connector; -use super::port::PortIdLocal; -use super::scheduler::{ComponentCtxFancy, SchedulerCtx}; - -pub(crate) struct ConnectorPublic { - pub inbox: PublicInbox, - pub sleeping: AtomicBool, -} - -impl ConnectorPublic { - pub fn new(initialize_as_sleeping: bool) -> Self { - ConnectorPublic{ - inbox: PublicInbox::new(), - sleeping: AtomicBool::new(initialize_as_sleeping), - } - } -} - -#[derive(Eq, PartialEq)] -pub(crate) enum ConnectorScheduling { - Immediate, // Run again, immediately - Later, // Schedule for running, at some later point in time - NotNow, // Do not reschedule for running - Exit, // Connector has exited -} - -pub(crate) struct ConnectorPDL { - tree: ExecTree, - consensus: Consensus, -} - -struct ConnectorRunContext<'a> { - branch_id: BranchId, - consensus: &'a Consensus, - received: &'a HashMap, - scheduler: SchedulerCtx<'a>, - prepared_channel: Option<(Value, Value)>, -} - -impl<'a> RunContext for ConnectorRunContext<'a>{ - fn did_put(&mut self, port: PortId) -> bool { - let port_id = PortIdLocal::new(port.0.u32_suffix); - let annotation = self.consensus.get_annotation(self.branch_id, port_id); - return annotation.registered_id.is_some(); - } - - fn get(&mut self, port: PortId) -> Option { - let port_id = PortIdLocal::new(port.0.u32_suffix); - match self.received.get(&port_id) { - Some(data) => Some(data.clone()), - None => None, - } - } - - fn fires(&mut self, port: PortId) -> Option { - let port_id = PortIdLocal::new(port.0.u32_suffix); - let annotation = self.consensus.get_annotation(self.branch_id, port_id); - return annotation.expected_firing.map(|v| Value::Bool(v)); - } - - fn get_channel(&mut self) -> Option<(Value, Value)> { - return self.prepared_channel.take(); - } -} - -impl Connector for ConnectorPDL { - fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - self.handle_new_messages(comp_ctx); - if self.tree.is_in_sync() { - let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; - } else { - return scheduling - } - } else { - let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); - return scheduling; - } - } -} - -impl ConnectorPDL { - pub fn new(initial: ComponentState) -> Self { - Self{ - tree: ExecTree::new(initial), - consensus: Consensus::new(), - } - } - - // --- Handling messages - - pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) { - while let Some(message) = ctx.read_next_message() { - match message { - MessageFancy::Data(message) => self.handle_new_data_message(message, ctx), - MessageFancy::Sync(message) => self.handle_new_sync_message(message, ctx), - MessageFancy::Control(_) => unreachable!("control message in component"), - } - } - } - - pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) { - // Go through all branches that are awaiting new messages and see if - // there is one that can receive this message. - debug_assert!(ctx.workspace_branches.is_empty()); - let mut branches = Vec::new(); // TODO: @Remove - self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches); - - for branch_id in branches.drain(..) { - // This branch can receive, so fork and given it the message - let receiving_branch_id = self.tree.fork_branch(branch_id); - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - let receiving_branch = &mut self.tree[receiving_branch_id]; - - receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); - - // And prepare the branch for running - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - } - } - - pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { - if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, ctx); - } - } - - // --- Running code - - pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - // Check if we have any branch that needs running - debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); - let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); - if branch_id.is_none() { - return ConnectorScheduling::NotNow; - } - - // Retrieve the branch and run it - let branch_id = branch_id.unwrap(); - let branch = &mut self.tree[branch_id]; - - let mut run_context = ConnectorRunContext{ - branch_id, - consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), - }; - let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); - - // Handle the returned result. Note that this match statement contains - // explicit returns in case the run result requires that the component's - // code is ran again immediately - match run_result { - RunResult::BranchInconsistent => { - // Branch became inconsistent - branch.sync_state = SpeculativeState::Inconsistent; - }, - RunResult::BranchMissingPortState(port_id) => { - // Branch called `fires()` on a port that has not been used yet. - let port_id = PortIdLocal::new(port_id.0.u32_suffix); - - // Create two forks, one that assumes the port will fire, and - // one that assumes the port remains silent - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - - let firing_branch_id = self.tree.fork_branch(branch_id); - let silent_branch_id = self.tree.fork_branch(branch_id); - self.consensus.notify_of_new_branch(branch_id, firing_branch_id); - let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); - debug_assert_eq!(_result, Consistency::Valid); - self.consensus.notify_of_new_branch(branch_id, silent_branch_id); - let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); - debug_assert_eq!(_result, Consistency::Valid); - - // Somewhat important: we push the firing one first, such that - // that branch is ran again immediately. - self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); - self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); - - return ConnectorScheduling::Immediate; - }, - RunResult::BranchMissingPortValue(port_id) => { - // Branch performed a `get()` on a port that does not have a - // received message on that port. - let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `get()` is valid, so mark the branch as awaiting a message - branch.sync_state = SpeculativeState::HaltedAtBranchPoint; - branch.awaiting_port = port_id; - self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); - - // Note: we only know that a branch is waiting on a message when - // it reaches the `get` call. But we might have already received - // a message that targets this branch, so check now. - let mut any_branch_received = false; - for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) { - // This branch can receive the message, so we do the - // fork-and-receive dance - let receiving_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[receiving_branch_id]; - - branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - - self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); - self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); - - any_branch_received = true; - } - } - - if any_branch_received { - return ConnectorScheduling::Immediate; - } - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } - } - RunResult::BranchAtSyncEnd => { - let consistency = self.consensus.notify_of_finished_branch(branch_id); - if consistency == Consistency::Valid { - branch.sync_state = SpeculativeState::ReachedSyncEnd; - self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); - } else if consistency == Consistency::Inconsistent { - branch.sync_state = SpeculativeState::Inconsistent; - } - }, - RunResult::BranchPut(port_id, content) => { - // Branch is attempting to send data - let port_id = PortIdLocal::new(port_id.0.u32_suffix); - let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); - if consistency == Consistency::Valid { - // `put()` is valid. - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); - comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{ - sync_header, data_header, - content: DataContent::Message(content), - })); - - self.tree.push_into_queue(QueueKind::Runnable, branch_id); - return ConnectorScheduling::Immediate; - } else { - branch.sync_state = SpeculativeState::Inconsistent; - } - }, - _ => unreachable!("unexpected run result {:?} in sync mode", run_result), - } - - // If here then the run result did not require a particular action. We - // return whether we have more active branches to run or not. - if self.tree.queue_is_empty(QueueKind::Runnable) { - return ConnectorScheduling::NotNow; - } else { - return ConnectorScheduling::Later; - } - } - - pub fn run_in_deterministic_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { - debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); - - let branch = self.tree.base_branch_mut(); - debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); - - let mut run_context = ConnectorRunContext{ - branch_id: branch.id, - consensus: &self.consensus, - received: &branch.inbox, - scheduler: sched_ctx, - prepared_channel: branch.prepared_channel.take(), - }; - let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); - - match run_result { - RunResult::ComponentTerminated => { - branch.sync_state = SpeculativeState::Finished; - - return ConnectorScheduling::Exit; - }, - RunResult::ComponentAtSyncStart => { - comp_ctx.notify_sync_start(); - let sync_branch_id = self.tree.start_sync(); - self.consensus.start_sync(comp_ctx); - self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); - self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); - - return ConnectorScheduling::Immediate; - }, - RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { - // Note: we're relinquishing ownership of ports. But because - // we are in non-sync mode the scheduler will handle and check - // port ownership transfer. - debug_assert!(comp_ctx.workspace_ports.is_empty()); - find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); - - let new_state = ComponentState { - prompt: Prompt::new( - &sched_ctx.runtime.protocol_description.types, - &sched_ctx.runtime.protocol_description.heap, - definition_id, monomorph_idx, arguments - ), - }; - let new_component = ConnectorPDL::new(new_state); - comp_ctx.push_component(new_component, comp_ctx.workspace_ports.clone()); - comp_ctx.workspace_ports.clear(); - - return ConnectorScheduling::Later; - }, - RunResult::NewChannel => { - let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); - debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); - branch.prepared_channel = Some(( - Value::Input(PortId::new(putter.self_id.index)), - Value::Output(PortId::new(getter.self_id.index)), - )); - - comp_ctx.push_port(putter); - comp_ctx.push_port(getter); - - return ConnectorScheduling::Immediate; - }, - _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), - } - } - - pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtxFancy) { - let mut fake_vec = Vec::new(); - self.tree.end_sync(solution_branch_id); - self.consensus.end_sync(solution_branch_id, &mut fake_vec); - - for port in fake_vec { - // TODO: Handle sent/received ports - debug_assert!(ctx.get_port_by_id(port).is_some()); - } - - ctx.notify_sync_end(&[]); - } -} \ No newline at end of file