diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index 63bf93d48624863bd605bf203596f14d3dab5e74..82eea098b06518f068ab584ac20f1988260ce872 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -1,16 +1,47 @@ +/// connector.rs +/// +/// Represents a component. A component (and the scheduler that is running it) +/// has many properties that are not easy to subdivide into aspects that are +/// conceptually handled by particular data structures. That is to say: the code +/// that we run governs: running PDL code, keeping track of ports, instantiating +/// new components and transports (i.e. interacting with the runtime), running +/// a consensus algorithm, etc. But on the other hand, our data is rather +/// simple: we have a speculative execution tree, a set of ports that we own, +/// and a bit of code that we should run. +/// +/// So currently the code is organized as following: +/// - The scheduler that is running the component is the authoritative source on +/// ports during *non-sync* mode. The consensus algorithm is the +/// authoritative source during *sync* mode. They retrieve each other's +/// state during the transitions. Hence port data exists duplicated between +/// these two datastructures. +/// - The execution tree is where executed branches reside. But the execution +/// tree is only aware of the tree shape itself (and keeps track of some +/// queues of branches that are in a particular state), and tends to store +/// the PDL program state. The consensus algorithm is also somewhat aware +/// of the execution tree, but only in terms of what is needed to complete +/// a sync round (for now, that means the port mapping in each branch). +/// Hence once more we have properties conceptually associated with branches +/// in two places. +/// - TODO: Write about handling messages, consensus wrapping data +/// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx + use std::sync::atomic::AtomicBool; -use crate::common::ComponentState; + use crate::PortId; -use crate::protocol::eval::{Value, ValueGroup}; +use crate::common::ComponentState; +use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; -use crate::runtime2::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; -use crate::runtime2::connector::ConnectorScheduling; -use crate::runtime2::consensus::{Consensus, Consistency}; -use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; -use crate::runtime2::inbox::PublicInbox; -use crate::runtime2::native::Connector; -use crate::runtime2::port::PortIdLocal; -use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx}; +use crate::runtime2::consensus::find_ports_in_value_group; +use crate::runtime2::port::PortKind; + +use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; +use super::consensus::{Consensus, Consistency}; +use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; +use super::inbox::PublicInbox; +use super::native::Connector; +use super::port::PortIdLocal; +use super::scheduler::{ComponentCtxFancy, SchedulerCtx}; pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, @@ -26,13 +57,20 @@ impl ConnectorPublic { } } +#[derive(Eq, PartialEq)] +pub(crate) enum ConnectorScheduling { + Immediate, // Run again, immediately + Later, // Schedule for running, at some later point in time + NotNow, // Do not reschedule for running + Exit, // Connector has exited +} + pub(crate) struct ConnectorPDL { tree: ExecTree, consensus: Consensus, - branch_workspace: Vec, } -struct ConnectorRunContext {}; +struct ConnectorRunContext {} impl RunContext for ConnectorRunContext{ fn did_put(&mut self, port: PortId) -> bool { todo!() @@ -80,11 +118,11 @@ impl ConnectorPDL { pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - debug_assert!(self.branch_workspace.is_empty()); + debug_assert!(ctx.workspace_branches.is_empty()); self.consensus.handle_received_sync_header(&message.sync_header, ctx); - self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut self.branch_workspace); + self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut ctx.workspace_branches); - for branch_id in self.branch_workspace.drain(..) { + for branch_id in ctx.workspace_branches.drain(..) { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); @@ -107,6 +145,7 @@ impl ConnectorPDL { pub fn run_in_sync_mode(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { // Check if we have any branch that needs running + debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); if branch_id.is_none() { return ConnectorScheduling::NotNow; @@ -175,7 +214,7 @@ impl ConnectorPDL { branch.insert_message(port_id, message.content.clone()); self.consensus.notify_of_new_branch(branch_id, recv_branch_id); - self.consensus.notify_of_received_message(recv_branch_id, &message.data_header); + self.consensus.notify_of_received_message(recv_branch_id, &message.data_header, &message.content); self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id); any_branch_received = true; @@ -198,13 +237,19 @@ impl ConnectorPDL { branch.sync_state == SpeculativeState::Inconsistent; } }, - RunResult::BranchPut(port_id, contents) => { + RunResult::BranchPut(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { // `put()` is valid. - self.consensus. + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{ + sync_header, data_header, content + })); + + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; } else { branch.sync_state = SpeculativeState::Inconsistent; } @@ -220,4 +265,63 @@ impl ConnectorPDL { return ConnectorScheduling::Later; } } + + pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); + + let branch = self.tree.base_branch_mut(); + debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); + + let mut run_context = ConnectorRunContext{}; + let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); + + match run_result { + RunResult::ComponentTerminated => { + branch.sync_state = SpeculativeState::Finished; + + return ConnectorScheduling::Exit; + }, + RunResult::ComponentAtSyncStart => { + let current_ports = comp_ctx.notify_sync_start(); + let sync_branch_id = self.tree.start_sync(); + self.consensus.start_sync(current_ports); + self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); + + return ConnectorScheduling::Immediate; + }, + RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { + // Note: we're relinquishing ownership of ports. But because + // we are in non-sync mode the scheduler will handle and check + // port ownership transfer. + debug_assert!(comp_ctx.workspace_ports.is_empty()); + find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); + + let new_state = ComponentState { + prompt: Prompt::new( + &sched_ctx.runtime.protocol_description.types, + &sched_ctx.runtime.protocol_description.heap, + definition_id, monomorph_idx, arguments + ), + }; + let new_component = ConnectorPDL::new(new_state, comp_ctx.workspace_ports.clone()); + comp_ctx.push_component(new_component); + + return ConnectorScheduling::Later; + }, + RunResult::NewChannel => { + let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); + debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); + branch.prepared_channel = Some(( + Value::Input(PortId::new(putter.self_id.index)), + Value::Output(PortId::new(getter.self_id.index)), + )); + + comp_ctx.push_port(putter); + comp_ctx.push_port(getter); + + return ConnectorScheduling::Immediate; + }, + _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), + } + } } \ No newline at end of file