From a2b6b8e94778381ee846a2327ae95ef80cd71d08 2021-11-07 15:10:22 From: MH Date: 2021-11-07 15:10:22 Subject: [PATCH] initial rewrite of component using new ExecTree and Consensus --- diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 16931db7d8d272feb58fb48d8950889cb978c032..2bc4b40970ebb3e4e47bb2cc73d876d2b9a0ef96 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -2,8 +2,8 @@ use std::collections::HashMap; use std::ops::{Index, IndexMut}; use crate::protocol::ComponentState; -use crate::protocol::eval::ValueGroup; -use crate::runtime2::port::PortIdLocal; +use crate::protocol::eval::{Value, ValueGroup}; +use crate::runtime2::port::{Port, PortIdLocal}; /// Generic branch ID. A component will always have one branch: the /// non-speculative branch. This branch has ID 0. Hence in a speculative context @@ -55,7 +55,8 @@ pub(crate) struct Branch { pub sync_state: SpeculativeState, pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue` - pub inbox: HashMap; // TODO: Remove, currently only valid in single-get/put mode + pub inbox: HashMap, // TODO: Remove, currently only valid in single-get/put mode + pub prepared_channel: Option<(Value, Value)>, // TODO: Maybe remove? } impl Branch { @@ -69,6 +70,7 @@ impl Branch { awaiting_port: PortIdLocal::new_invalid(), next_in_queue: BranchId::new_invalid(), inbox: HashMap::new(), + prepared_channel: None, } } @@ -89,6 +91,7 @@ impl Branch { awaiting_port: parent_branch.awaiting_port, next_in_queue: BranchId::new_invalid(), inbox: parent_branch.inbox.clone(), + prepared_channel: None, } } @@ -210,6 +213,12 @@ impl ExecTree { } } + /// Returns the non-sync branch (TODO: better name?) + pub fn base_branch_mut(&mut self) -> &mut Branch { + debug_assert!(!self.is_in_sync()); + return &mut self.branches[0]; + } + /// Returns an iterator over all the elements in the queue of the given kind pub fn iter_queue(&self, kind: QueueKind) -> BranchQueueIter { let queue = &self.queues[kind.as_index()]; @@ -232,13 +241,15 @@ impl ExecTree { // --- Preparing and finishing a speculative round /// Starts a synchronous round by cloning the non-sync branch and marking it - /// as the root of the speculative tree. - pub fn start_sync(&mut self) { + /// as the root of the speculative tree. The id of this root sync branch is + /// returned. + pub fn start_sync(&mut self) -> BranchId { debug_assert!(!self.is_in_sync()); let sync_branch = Branch::new_sync(1, &self.branches[0]); let sync_branch_id = sync_branch.id; self.branches.push(sync_branch); - self.push_into_queue(QueueKind::Runnable, sync_branch_id); + + return sync_branch_id; } /// Creates a new speculative branch based on the provided one. The index to diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index 63bf93d48624863bd605bf203596f14d3dab5e74..82eea098b06518f068ab584ac20f1988260ce872 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -1,16 +1,47 @@ +/// connector.rs +/// +/// Represents a component. A component (and the scheduler that is running it) +/// has many properties that are not easy to subdivide into aspects that are +/// conceptually handled by particular data structures. That is to say: the code +/// that we run governs: running PDL code, keeping track of ports, instantiating +/// new components and transports (i.e. interacting with the runtime), running +/// a consensus algorithm, etc. But on the other hand, our data is rather +/// simple: we have a speculative execution tree, a set of ports that we own, +/// and a bit of code that we should run. +/// +/// So currently the code is organized as following: +/// - The scheduler that is running the component is the authoritative source on +/// ports during *non-sync* mode. The consensus algorithm is the +/// authoritative source during *sync* mode. They retrieve each other's +/// state during the transitions. Hence port data exists duplicated between +/// these two datastructures. +/// - The execution tree is where executed branches reside. But the execution +/// tree is only aware of the tree shape itself (and keeps track of some +/// queues of branches that are in a particular state), and tends to store +/// the PDL program state. The consensus algorithm is also somewhat aware +/// of the execution tree, but only in terms of what is needed to complete +/// a sync round (for now, that means the port mapping in each branch). +/// Hence once more we have properties conceptually associated with branches +/// in two places. +/// - TODO: Write about handling messages, consensus wrapping data +/// - TODO: Write about way information is exchanged between PDL/component and scheduler through ctx + use std::sync::atomic::AtomicBool; -use crate::common::ComponentState; + use crate::PortId; -use crate::protocol::eval::{Value, ValueGroup}; +use crate::common::ComponentState; +use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; -use crate::runtime2::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; -use crate::runtime2::connector::ConnectorScheduling; -use crate::runtime2::consensus::{Consensus, Consistency}; -use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; -use crate::runtime2::inbox::PublicInbox; -use crate::runtime2::native::Connector; -use crate::runtime2::port::PortIdLocal; -use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx}; +use crate::runtime2::consensus::find_ports_in_value_group; +use crate::runtime2::port::PortKind; + +use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; +use super::consensus::{Consensus, Consistency}; +use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; +use super::inbox::PublicInbox; +use super::native::Connector; +use super::port::PortIdLocal; +use super::scheduler::{ComponentCtxFancy, SchedulerCtx}; pub(crate) struct ConnectorPublic { pub inbox: PublicInbox, @@ -26,13 +57,20 @@ impl ConnectorPublic { } } +#[derive(Eq, PartialEq)] +pub(crate) enum ConnectorScheduling { + Immediate, // Run again, immediately + Later, // Schedule for running, at some later point in time + NotNow, // Do not reschedule for running + Exit, // Connector has exited +} + pub(crate) struct ConnectorPDL { tree: ExecTree, consensus: Consensus, - branch_workspace: Vec, } -struct ConnectorRunContext {}; +struct ConnectorRunContext {} impl RunContext for ConnectorRunContext{ fn did_put(&mut self, port: PortId) -> bool { todo!() @@ -80,11 +118,11 @@ impl ConnectorPDL { pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - debug_assert!(self.branch_workspace.is_empty()); + debug_assert!(ctx.workspace_branches.is_empty()); self.consensus.handle_received_sync_header(&message.sync_header, ctx); - self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut self.branch_workspace); + self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut ctx.workspace_branches); - for branch_id in self.branch_workspace.drain(..) { + for branch_id in ctx.workspace_branches.drain(..) { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); @@ -107,6 +145,7 @@ impl ConnectorPDL { pub fn run_in_sync_mode(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { // Check if we have any branch that needs running + debug_assert!(self.tree.is_in_sync() && self.consensus.is_in_sync()); let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); if branch_id.is_none() { return ConnectorScheduling::NotNow; @@ -175,7 +214,7 @@ impl ConnectorPDL { branch.insert_message(port_id, message.content.clone()); self.consensus.notify_of_new_branch(branch_id, recv_branch_id); - self.consensus.notify_of_received_message(recv_branch_id, &message.data_header); + self.consensus.notify_of_received_message(recv_branch_id, &message.data_header, &message.content); self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id); any_branch_received = true; @@ -198,13 +237,19 @@ impl ConnectorPDL { branch.sync_state == SpeculativeState::Inconsistent; } }, - RunResult::BranchPut(port_id, contents) => { + RunResult::BranchPut(port_id, content) => { // Branch is attempting to send data let port_id = PortIdLocal::new(port_id.0.u32_suffix); let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { // `put()` is valid. - self.consensus. + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); + comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{ + sync_header, data_header, content + })); + + self.tree.push_into_queue(QueueKind::Runnable, branch_id); + return ConnectorScheduling::Immediate; } else { branch.sync_state = SpeculativeState::Inconsistent; } @@ -220,4 +265,63 @@ impl ConnectorPDL { return ConnectorScheduling::Later; } } + + pub fn run_in_deterministic_mode(&mut self, sched_ctx: &SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + debug_assert!(!self.tree.is_in_sync() && !self.consensus.is_in_sync()); + + let branch = self.tree.base_branch_mut(); + debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); + + let mut run_context = ConnectorRunContext{}; + let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); + + match run_result { + RunResult::ComponentTerminated => { + branch.sync_state = SpeculativeState::Finished; + + return ConnectorScheduling::Exit; + }, + RunResult::ComponentAtSyncStart => { + let current_ports = comp_ctx.notify_sync_start(); + let sync_branch_id = self.tree.start_sync(); + self.consensus.start_sync(current_ports); + self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); + + return ConnectorScheduling::Immediate; + }, + RunResult::NewComponent(definition_id, monomorph_idx, arguments) => { + // Note: we're relinquishing ownership of ports. But because + // we are in non-sync mode the scheduler will handle and check + // port ownership transfer. + debug_assert!(comp_ctx.workspace_ports.is_empty()); + find_ports_in_value_group(&arguments, &mut comp_ctx.workspace_ports); + + let new_state = ComponentState { + prompt: Prompt::new( + &sched_ctx.runtime.protocol_description.types, + &sched_ctx.runtime.protocol_description.heap, + definition_id, monomorph_idx, arguments + ), + }; + let new_component = ConnectorPDL::new(new_state, comp_ctx.workspace_ports.clone()); + comp_ctx.push_component(new_component); + + return ConnectorScheduling::Later; + }, + RunResult::NewChannel => { + let (getter, putter) = sched_ctx.runtime.create_channel(comp_ctx.id); + debug_assert!(getter.kind == PortKind::Getter && putter.kind == PortKind::Putter); + branch.prepared_channel = Some(( + Value::Input(PortId::new(putter.self_id.index)), + Value::Output(PortId::new(getter.self_id.index)), + )); + + comp_ctx.push_port(putter); + comp_ctx.push_port(getter); + + return ConnectorScheduling::Immediate; + }, + _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), + } + } } \ No newline at end of file diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 4e2be12a01bfddf3d22459770d76da2f91bfe2a1..d767ac3b32f5ba86419cf6db6ed055335bd16ca1 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -3,7 +3,7 @@ use crate::protocol::eval::ValueGroup; use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; use crate::runtime2::ConnectorId; use crate::runtime2::inbox2::{DataHeader, SyncHeader}; -use crate::runtime2::port::PortIdLocal; +use crate::runtime2::port::{Port, PortIdLocal}; use crate::runtime2::scheduler::ComponentCtxFancy; use super::inbox2::PortAnnotation; @@ -17,9 +17,12 @@ struct BranchAnnotation { /// /// The type itself serves as an experiment to see how code should be organized. // TODO: Flatten all datastructures +// TODO: Have a "branch+port position hint" in case multiple operations are +// performed on the same port to prevent repeated lookups pub(crate) struct Consensus { highest_connector_id: ConnectorId, branch_annotations: Vec, + workspace_ports: Vec, } #[derive(Clone, Copy, PartialEq, Eq)] @@ -38,10 +41,15 @@ impl Consensus { // --- Controlling sync round and branches + /// Returns whether the consensus algorithm is running in sync mode + pub fn is_in_sync(&self) -> bool { + return !self.branch_annotations.is_empty(); + } + /// Sets up the consensus algorithm for a new synchronous round. The /// provided ports should be the ports the component owns at the start of /// the sync round. - pub fn start_sync(&mut self, ports: &[PortIdLocal]) { + pub fn start_sync(&mut self, ports: &[Port]) { debug_assert!(self.branch_annotations.is_empty()); debug_assert!(!self.highest_connector_id.is_valid()); @@ -50,7 +58,7 @@ impl Consensus { self.branch_annotations.push(BranchAnnotation{ port_mapping: ports.iter() .map(|v| PortAnnotation{ - port_id: *v, + port_id: v.self_id, registered_id: None, expected_firing: None, }) @@ -127,16 +135,48 @@ impl Consensus { /// Prepares a message for sending. Caller should have made sure that /// sending the message is consistent with the speculative state. - pub fn prepare_message(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, value: &ValueGroup) -> (SyncHeader, DataHeader) { + pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtxFancy) -> (SyncHeader, DataHeader) { + debug_assert!(self.is_in_sync()); + let branch = &mut self.branch_annotations[branch_id.index as usize]; + if cfg!(debug_assertions) { - let branch = &self.branch_annotations[branch_id.index as usize]; let port = branch.port_mapping.iter() .find(|v| v.port_id == source_port_id) .unwrap(); debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); } - + // Check for ports that are begin sent + debug_assert!(self.workspace_ports.is_empty()); + find_ports_in_value_group(content, &mut self.workspace_ports); + if !self.workspace_ports.is_empty() { + todo!("handle sending ports"); + self.workspace_ports.clear(); + } + + let sync_header = SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + }; + + // TODO: Handle multiple firings. Right now we just assign the current + // branch to the `None` value because we know we can only send once. + debug_assert!(branch.port_mapping.iter().find(|v| v.port_id == source_port_id).unwrap().registered_id.is_none()); + let port_info = ctx.get_port_by_id(source_port_id).unwrap(); + let data_header = DataHeader{ + expected_mapping: branch.port_mapping.clone(), + target_port: port_info.peer_id, + new_mapping: branch_id + }; + + for mapping in &mut branch.port_mapping { + if mapping.port_id == source_port_id { + mapping.expected_firing = Some(true); + mapping.registered_id = Some(branch_id); + } + } + + return (sync_header, data_header); } pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { @@ -161,12 +201,22 @@ impl Consensus { } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader) { + pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader, content: &ValueGroup) { debug_assert!(self.branch_can_receive(branch_id, data_header)); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { if mapping.port_id == data_header.target_port { + // Found the port in which the message should be inserted mapping.registered_id = Some(data_header.new_mapping); + + // Check for sent ports + debug_assert!(self.workspace_ports.is_empty()); + find_ports_in_value_group(content, &mut self.workspace_ports); + if !self.workspace_ports.is_empty() { + todo!("handle received ports"); + self.workspace_ports.clear(); + } + return; } } @@ -196,4 +246,47 @@ impl Consensus { return true; } +} + +/// Recursively goes through the value group, attempting to find ports. +/// Duplicates will only be added once. +pub(crate) fn find_ports_in_value_group(value_group: &ValueGroup, ports: &mut Vec) { + // Helper to check a value for a port and recurse if needed. + use crate::protocol::eval::Value; + + fn find_port_in_value(group: &ValueGroup, value: &Value, ports: &mut Vec) { + match value { + Value::Input(port_id) | Value::Output(port_id) => { + // This is an actual port + let cur_port = PortIdLocal::new(port_id.0.u32_suffix); + for prev_port in ports.iter() { + if *prev_port == cur_port { + // Already added + return; + } + } + + ports.push(cur_port); + }, + Value::Array(heap_pos) | + Value::Message(heap_pos) | + Value::String(heap_pos) | + Value::Struct(heap_pos) | + Value::Union(_, heap_pos) => { + // Reference to some dynamic thing which might contain ports, + // so recurse + let heap_region = &group.regions[*heap_pos as usize]; + for embedded_value in heap_region { + find_port_in_value(group, embedded_value, ports); + } + }, + _ => {}, // values we don't care about + } + } + + // Clear the ports, then scan all the available values + ports.clear(); + for value in &value_group.values { + find_port_in_value(value_group, value, ports); + } } \ No newline at end of file diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index 44e212f8d94a70c3bcd30bdd64c4cf168ea5433a..19371b8bea7327ad1854c21af335f37ebd077341 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -411,8 +411,12 @@ pub(crate) struct ComponentCtxFancy { // Submitted by the component is_in_sync: bool, changed_in_sync: bool, - outbox: VecDeque, + outbox: VecDeque, state_changes: VecDeque + // Workspaces that may be used by components to (generally) prevent + // allocations. Be a good scout and leave it empty after you've used it. + pub workspace_ports: Vec, + pub workspace_branches: Vec, } pub(crate) enum ReceivedMessage { @@ -478,7 +482,7 @@ impl ComponentCtxFancy { /// Submit a message for the scheduler to send to the appropriate receiver. /// May only be called inside of a sync block. - pub(crate) fn submit_message(&mut self, contents: MessageContents) { + pub(crate) fn submit_message(&mut self, contents: MessageFancy) { debug_assert!(self.is_in_sync); self.outbox.push_back(contents); }