From ce98be9707a652d03ea8243165aadaeb234e6e60 2021-11-06 18:59:24 From: MH Date: 2021-11-06 18:59:24 Subject: [PATCH] wip on refactoring component --- diff --git a/docs/runtime/design.md b/docs/runtime/design.md index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..7e25c9374999a2424a876417a5f037fd161ef568 100644 --- a/docs/runtime/design.md +++ b/docs/runtime/design.md @@ -0,0 +1,16 @@ +# Runtime Design + +## Preliminary preliminaries + +There will be some confusion when we're using the word "synchronization". When we talk about OS-syncing we mean using synchronization primitives such as atomics, mutexes, semaphores etc. When we talk about sync-blocks, sync-regions, etc. we mean the Reowolf language's distributed consensus feature. + +## Preliminary Notes + +The runtime was designed in several iterations. For the purpose of documentation, we have had: + +- Reowolf 1.0: A single-threaded, globally locking runtime. +- Initial 1.2: A single-threaded runtime, no longer globally locking. The newly designed consensus algorithm worked quite well and reasonably efficiently (not measured by comparison to another runtime, rather, the idea of the consensus algorithm was simple and efficient to perform) +- Multithreaded 1.2, v1: Here is where we moved towards a more multithreaded design. From the start, the idea was to "maximize concurrency", that is to say: we should only use OS-syncing when absolutely appropriate. Furthermore the initial implementation should be somewhat efficient: we should not employ locks when it is not absolutely necessary. The following remarks can be made with respect to this initial multithreaded implementation: + - Because there will generally be far more components than there are hardware threads, an efficient implementation requires some kind of scheduler that is able to execute the code of components. To track which components are supposed to run, there will be a work queue. The initial implementation features just a single global work queue. Each thread that is executing components is called a scheduler in this document. + - At the most basic level, a component has properties that allow access by only one writer at a time, and properties that are conceptually (ofcourse we need some kind of OS synchronization) accessible by multiple writers at a time. At the very least, executing the code of a component should only be performed by one writer at a time (the scheduler), while sending messages to a component should be allowed by multiple schedulers. Hence the runtime splits component properties into two: those that should only be accessed by the scheduler that is executing the code, and those that should be accessible by all schedulers at any time. + - Components communicate with eachother through transport links. Since messages need to arrive at the correct target. TODO: FINISH \ No newline at end of file diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index f67f2e5570a5743c3680ae3af354a0bd59ce13d7..16931db7d8d272feb58fb48d8950889cb978c032 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -1,6 +1,9 @@ +use std::collections::HashMap; use std::ops::{Index, IndexMut}; use crate::protocol::ComponentState; +use crate::protocol::eval::ValueGroup; +use crate::runtime2::port::PortIdLocal; /// Generic branch ID. A component will always have one branch: the /// non-speculative branch. This branch has ID 0. Hence in a speculative context @@ -50,7 +53,9 @@ pub(crate) struct Branch { // Execution state pub code_state: ComponentState, pub sync_state: SpeculativeState, + pub awaiting_port: PortIdLocal, // only valid if in "awaiting message" queue. TODO: Maybe put in enum pub next_in_queue: BranchId, // used by `ExecTree`/`BranchQueue` + pub inbox: HashMap; // TODO: Remove, currently only valid in single-get/put mode } impl Branch { @@ -61,7 +66,9 @@ impl Branch { parent_id: BranchId::new_invalid(), code_state: component_state, sync_state: SpeculativeState::RunningNonSync, + awaiting_port: PortIdLocal::new_invalid(), next_in_queue: BranchId::new_invalid(), + inbox: HashMap::new(), } } @@ -79,9 +86,20 @@ impl Branch { parent_id: parent_branch.index, code_state: parent_branch.code_state.clone(), sync_state: SpeculativeState::RunningInSync, + awaiting_port: parent_branch.awaiting_port, next_in_queue: BranchId::new_invalid(), + inbox: parent_branch.inbox.clone(), } } + + /// Inserts a message into the branch for retrieval by a corresponding + /// `get(port)` call. + pub(crate) fn insert_message(&mut self, target_port: PortIdLocal, contents: ValueGroup) { + debug_assert!(target_port.is_valid()); + debug_assert!(self.awaiting_port == target_port); + self.awaiting_port = PortIdLocal::new_invalid(); + self.inbox.insert(target_port, contents); + } } /// Queue of branches. Just a little helper. @@ -157,6 +175,11 @@ impl ExecTree { return self.branches.len() != 1; } + /// Returns true if the particular queue is empty + pub fn queue_is_empty(&self, kind: QueueKind) -> bool { + return self.queues[kind.as_index()].is_empty(); + } + /// Pops a branch (ID) from a queue. pub fn pop_from_queue(&mut self, kind: QueueKind) -> Option { let queue = &mut self.queues[kind.as_index()]; @@ -187,15 +210,25 @@ impl ExecTree { } } - pub fn iter_queue(&self, kind: QueueKind) -> BranchIter { + /// Returns an iterator over all the elements in the queue of the given kind + pub fn iter_queue(&self, kind: QueueKind) -> BranchQueueIter { let queue = &self.queues[kind.as_index()]; let index = queue.first as usize; - return BranchIter{ + return BranchQueueIter { branches: self.branches.as_slice(), index, } } + /// Returns an iterator that starts with the provided branch, and then + /// continues to visit all of the branch's parents. + pub fn iter_parents(&self, branch_id: BranchId) -> BranchParentIter { + return BranchParentIter{ + branches: self.branches.as_slice(), + index: branch_id.index as usize, + } + } + // --- Preparing and finishing a speculative round /// Starts a synchronous round by cloning the non-sync branch and marking it @@ -210,16 +243,12 @@ impl ExecTree { /// Creates a new speculative branch based on the provided one. The index to /// retrieve this new branch will be returned. - pub fn fork_branch(&mut self, parent_branch_id: BranchId, initial_queue: Option) -> BranchId { + pub fn fork_branch(&mut self, parent_branch_id: BranchId) -> BranchId { debug_assert!(self.is_in_sync()); let parent_branch = &self[parent_branch_id]; let new_branch = Branch::new_sync(1, parent_branch); let new_branch_id = new_branch.id; - if let Some(kind) = initial_queue { - self.push_into_queue(kind, new_branch_id); - } - return new_branch_id; } @@ -263,12 +292,12 @@ impl IndexMut for ExecTree { } } -pub struct BranchIter<'a> { +pub struct BranchQueueIter<'a> { branches: &'a [Branch], index: usize, } -impl<'a> Iterator for BranchIter<'a> { +impl<'a> Iterator for BranchQueueIter<'a> { type Item = &'a Branch; fn next(&mut self) -> Option { @@ -281,4 +310,23 @@ impl<'a> Iterator for BranchIter<'a> { self.index = branch.next_in_queue.index as usize; return Some(branch); } +} + +pub struct BranchParentIter<'a> { + branches: &'a [Branch], + index: usize, +} + +impl<'a> Iterator for BranchParentIter<'a> { + type Item = &'a Branch; + + fn next(&mut self) -> Option { + if self.index == 0 { + return None; + } + + let branch = &self.branches[self.index]; + self.index = branch.parent_id.index as usize; + return Some(branch); + } } \ No newline at end of file diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs new file mode 100644 index 0000000000000000000000000000000000000000..63bf93d48624863bd605bf203596f14d3dab5e74 --- /dev/null +++ b/src/runtime2/connector2.rs @@ -0,0 +1,223 @@ +use std::sync::atomic::AtomicBool; +use crate::common::ComponentState; +use crate::PortId; +use crate::protocol::eval::{Value, ValueGroup}; +use crate::protocol::{RunContext, RunResult}; +use crate::runtime2::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; +use crate::runtime2::connector::ConnectorScheduling; +use crate::runtime2::consensus::{Consensus, Consistency}; +use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; +use crate::runtime2::inbox::PublicInbox; +use crate::runtime2::native::Connector; +use crate::runtime2::port::PortIdLocal; +use crate::runtime2::scheduler::{ComponentCtxFancy, SchedulerCtx}; + +pub(crate) struct ConnectorPublic { + pub inbox: PublicInbox, + pub sleeping: AtomicBool, +} + +impl ConnectorPublic { + pub fn new(initialize_as_sleeping: bool) -> Self { + ConnectorPublic{ + inbox: PublicInbox::new(), + sleeping: AtomicBool::new(initialize_as_sleeping), + } + } +} + +pub(crate) struct ConnectorPDL { + tree: ExecTree, + consensus: Consensus, + branch_workspace: Vec, +} + +struct ConnectorRunContext {}; +impl RunContext for ConnectorRunContext{ + fn did_put(&mut self, port: PortId) -> bool { + todo!() + } + + fn get(&mut self, port: PortId) -> Option { + todo!() + } + + fn fires(&mut self, port: PortId) -> Option { + todo!() + } + + fn get_channel(&mut self) -> Option<(Value, Value)> { + todo!() + } +} + +impl Connector for ConnectorPDL { + fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + todo!() + } +} + +impl ConnectorPDL { + pub fn new(initial: ComponentState, owned_ports: Vec) -> Self { + Self{ + tree: ExecTree::new(initial), + consensus: Consensus::new(), + } + } + + // --- Handling messages + + pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) { + while let Some(message) = ctx.read_next_message() { + match message { + MessageFancy::Data(message) => handle_new_data_message(message, ctx), + MessageFancy::Sync(message) => handle_new_sync_message(message, ctx), + MessageFancy::Control(_) => unreachable!("control message in component"), + } + } + } + + pub fn handle_new_data_message(&mut self, message: DataMessageFancy, ctx: &mut ComponentCtxFancy) { + // Go through all branches that are awaiting new messages and see if + // there is one that can receive this message. + debug_assert!(self.branch_workspace.is_empty()); + self.consensus.handle_received_sync_header(&message.sync_header, ctx); + self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut self.branch_workspace); + + for branch_id in self.branch_workspace.drain(..) { + // This branch can receive, so fork and given it the message + let receiving_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + let receiving_branch = &mut self.tree[receiving_branch_id]; + + receiving_branch.insert_message(message.data_header.target_port, message.content.clone()); + self.consensus.notify_of_received_message(branch_id, &message.data_header); + + // And prepare the branch for running + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + } + } + + pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { + self.consensus.handle_received_sync_header(&message.sync_header, ctx); + todo!("handle content of message?"); + } + + // --- Running code + + pub fn run_in_sync_mode(&mut self, sched_ctx: &mut SchedulerCtx, comp_ctx: &mut ComponentCtxFancy) -> ConnectorScheduling { + // Check if we have any branch that needs running + let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); + if branch_id.is_none() { + return ConnectorScheduling::NotNow; + } + + // Retrieve the branch and run it + let branch_id = branch_id.unwrap(); + let branch = &mut self.tree[branch_id]; + + let mut run_context = ConnectorRunContext{}; + let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); + + // Handle the returned result. Note that this match statement contains + // explicit returns in case the run result requires that the component's + // code is ran again immediately + match run_result { + RunResult::BranchInconsistent => { + // Branch became inconsistent + branch.sync_state = SpeculativeState::Inconsistent; + }, + RunResult::BranchMissingPortState(port_id) => { + // Branch called `fires()` on a port that has not been used yet. + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + + // Create two forks, one that assumes the port will fire, and + // one that assumes the port remains silent + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + + let firing_branch_id = self.tree.fork_branch(branch_id); + let silent_branch_id = self.tree.fork_branch(branch_id); + self.consensus.notify_of_new_branch(branch_id, firing_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); + debug_assert_eq!(_result, Consistency::Valid); + self.consensus.notify_of_new_branch(branch_id, silent_branch_id); + let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); + debug_assert_eq!(_result, Consistency::Valid); + + // Somewhat important: we push the firing one first, such that + // that branch is ran again immediately. + self.tree.push_into_queue(QueueKind::Runnable, firing_branch_id); + self.tree.push_into_queue(QueueKind::Runnable, silent_branch_id); + + return ConnectorScheduling::Immediate; + }, + RunResult::BranchMissingPortValue(port_id) => { + // Branch performed a `get()` on a port that does not have a + // received message on that port. + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + // `get()` is valid, so mark the branch as awaiting a message + branch.sync_state = SpeculativeState::HaltedAtBranchPoint; + branch.awaiting_port = port_id; + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + + // Note: we only know that a branch is waiting on a message when + // it reaches the `get` call. But we might have already received + // a message that targets this branch, so check now. + let mut any_branch_received = false; + for message in comp_ctx.get_read_data_messages(port_id) { + if self.consensus.branch_can_receive(branch_id, &message.data_header) { + // This branch can receive the message, so we do the + // fork-and-receive dance + let recv_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[recv_branch_id]; + branch.insert_message(port_id, message.content.clone()); + + self.consensus.notify_of_new_branch(branch_id, recv_branch_id); + self.consensus.notify_of_received_message(recv_branch_id, &message.data_header); + self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id); + + any_branch_received = true; + } + } + + if any_branch_received { + return ConnectorScheduling::Immediate; + } + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + } + RunResult::BranchAtSyncEnd => { + let consistency = self.consensus.notify_of_finished_branch(branch_id); + if consistency == Consistency::Valid { + branch.sync_state = SpeculativeState::ReachedSyncEnd; + self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); + } else if consistency == Consistency::Inconsistent { + branch.sync_state == SpeculativeState::Inconsistent; + } + }, + RunResult::BranchPut(port_id, contents) => { + // Branch is attempting to send data + let port_id = PortIdLocal::new(port_id.0.u32_suffix); + let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); + if consistency == Consistency::Valid { + // `put()` is valid. + self.consensus. + } else { + branch.sync_state = SpeculativeState::Inconsistent; + } + }, + _ => unreachable!("unexpected run result {:?} in sync mode", run_result), + } + + // If here then the run result did not require a particular action. We + // return whether we have more active branches to run or not. + if self.tree.queue_is_empty(QueueKind::Runnable) { + return ConnectorScheduling::NotNow; + } else { + return ConnectorScheduling::Later; + } + } +} \ No newline at end of file diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..4e2be12a01bfddf3d22459770d76da2f91bfe2a1 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -0,0 +1,199 @@ + +use crate::protocol::eval::ValueGroup; +use crate::runtime2::branch::{BranchId, ExecTree, QueueKind}; +use crate::runtime2::ConnectorId; +use crate::runtime2::inbox2::{DataHeader, SyncHeader}; +use crate::runtime2::port::PortIdLocal; +use crate::runtime2::scheduler::ComponentCtxFancy; +use super::inbox2::PortAnnotation; + +struct BranchAnnotation { + port_mapping: Vec, +} + +/// The consensus algorithm. Currently only implemented to find the component +/// with the highest ID within the sync region and letting it handle all the +/// local solutions. +/// +/// The type itself serves as an experiment to see how code should be organized. +// TODO: Flatten all datastructures +pub(crate) struct Consensus { + highest_connector_id: ConnectorId, + branch_annotations: Vec, +} + +#[derive(Clone, Copy, PartialEq, Eq)] +pub(crate) enum Consistency { + Valid, + Inconsistent, +} + +impl Consensus { + pub fn new() -> Self { + return Self { + highest_connector_id: ConnectorId::new_invalid(), + branch_annotations: Vec::new(), + } + } + + // --- Controlling sync round and branches + + /// Sets up the consensus algorithm for a new synchronous round. The + /// provided ports should be the ports the component owns at the start of + /// the sync round. + pub fn start_sync(&mut self, ports: &[PortIdLocal]) { + debug_assert!(self.branch_annotations.is_empty()); + debug_assert!(!self.highest_connector_id.is_valid()); + + // We'll use the first "branch" (the non-sync one) to store our ports, + // this allows cloning if we created a new branch. + self.branch_annotations.push(BranchAnnotation{ + port_mapping: ports.iter() + .map(|v| PortAnnotation{ + port_id: *v, + registered_id: None, + expected_firing: None, + }) + .collect(), + }); + } + + /// Notifies the consensus algorithm that a new branch has appeared. Must be + /// called for each forked branch in the execution tree. + pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) { + // If called correctly. Then each time we are notified the new branch's + // index is the length in `branch_annotations`. + debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); + let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; + let new_branch_annotations = BranchAnnotation{ + port_mapping: parent_branch_annotations.port_mapping.clone(), + }; + self.branch_annotations.push(new_branch_annotations); + } + + /// Notifies the consensus algorithm that a branch has reached the end of + /// the sync block. A final check for consistency will be performed that the + /// caller has to handle + pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency { + let branch = &self.branch_annotations[branch_id.index as usize]; + for mapping in &branch.port_mapping { + match mapping.expected_firing { + Some(expected) => { + if expected != mapping.registered_id.is_some() { + // Inconsistent speculative state and actual state + debug_assert!(mapping.registered_id.is_none()); // because if we did fire on a silent port, we should've caught that earlier + return Consistency::Inconsistent; + } + }, + None => {}, + } + } + + return Consistency::Valid; + } + + /// Notifies the consensus algorithm that a particular branch has assumed + /// a speculative value for its port mapping. + pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency { + let branch = &mut self.branch_annotations[branch_id.index as usize]; + for mapping in &mut branch.port_mapping { + if mapping.port_id == port_id { + match mapping.expected_firing { + None => { + // Not yet mapped, perform speculative mapping + mapping.expected_firing = Some(does_fire); + return Consistency::Valid; + }, + Some(current) => { + // Already mapped + if current == does_fire { + return Consistency::Valid; + } else { + return Consistency::Inconsistent; + } + } + } + } + } + + unreachable!("notify_of_speculative_mapping called with unowned port"); + } + + pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { + todo!("write"); + } + + // --- Handling messages + + /// Prepares a message for sending. Caller should have made sure that + /// sending the message is consistent with the speculative state. + pub fn prepare_message(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, value: &ValueGroup) -> (SyncHeader, DataHeader) { + if cfg!(debug_assertions) { + let branch = &self.branch_annotations[branch_id.index as usize]; + let port = branch.port_mapping.iter() + .find(|v| v.port_id == source_port_id) + .unwrap(); + debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); + } + + + } + + pub fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtxFancy) { + todo!("should check IDs and maybe send sync messages"); + } + + /// Checks data header and consults the stored port mapping and the + /// execution tree to see which branches may receive the data message's + /// contents. + /// + /// This function is generally called for freshly received messages that + /// should be matched against previously halted branches. + pub fn handle_received_data_header(&mut self, exec_tree: &ExecTree, data_header: &DataHeader, target_ids: &mut Vec) { + for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage) { + if branch.awaiting_port == data_header.target_port { + // Found a branch awaiting the message, but we need to make sure + // the mapping is correct + if self.branch_can_receive(branch.id, data_header) { + target_ids.push(branch.id); + } + } + } + } + + pub fn notify_of_received_message(&mut self, branch_id: BranchId, data_header: &DataHeader) { + debug_assert!(self.branch_can_receive(branch_id, data_header)); + let branch = &mut self.branch_annotations[branch_id.index as usize]; + for mapping in &mut branch.port_mapping { + if mapping.port_id == data_header.target_port { + mapping.registered_id = Some(data_header.new_mapping); + return; + } + } + + // If here, then the branch didn't actually own the port? Means the + // caller made a mistake + unreachable!("incorrect notify_of_received_message"); + } + + /// Matches the mapping between the branch and the data message. If they + /// match then the branch can receive the message. + pub(crate) fn branch_can_receive(&self, branch_id: BranchId, data_header: &DataHeader) -> bool { + let annotation = &self.branch_annotations[branch_id.index as usize]; + for expected in &data_header.expected_mapping { + // If we own the port, then we have an entry in the + // annotation, check if the current mapping matches + for current in &annotation.port_mapping { + if expected.port_id == current.port_id { + if expected.registered_id != current.registered_id { + // IDs do not match, we cannot receive the + // message in this branch + return false; + } + } + } + } + + return true; + } +} \ No newline at end of file diff --git a/src/runtime2/inbox2.rs b/src/runtime2/inbox2.rs index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..d07e95d6839724f9c0b4ae5066a4c42688dc893b 100644 --- a/src/runtime2/inbox2.rs +++ b/src/runtime2/inbox2.rs @@ -0,0 +1,64 @@ +use crate::protocol::eval::ValueGroup; +use crate::runtime2::branch::BranchId; +use crate::runtime2::ConnectorId; +use crate::runtime2::port::PortIdLocal; + +#[derive(Copy, Clone)] +pub(crate) struct PortAnnotation { + pub port_id: PortIdLocal, + pub registered_id: Option, + pub expected_firing: Option, +} + +/// The header added by the synchronization algorithm to all. +pub(crate) struct SyncHeader { + pub sending_component_id: ConnectorId, + pub highest_component_id: ConnectorId, +} + +/// The header added to data messages +pub(crate) struct DataHeader { + pub expected_mapping: Vec, + pub target_port: PortIdLocal, + pub new_mapping: BranchId, +} + +/// A data message is a message that is intended for the receiver's PDL code, +/// but will also be handled by the consensus algrorithm +pub(crate) struct DataMessageFancy { + pub sync_header: SyncHeader, + pub data_header: DataHeader, + pub content: ValueGroup, +} + +pub(crate) enum SyncContent { + +} + +/// A sync message is a message that is intended only for the consensus +/// algorithm. +pub(crate) struct SyncMessageFancy { + pub sync_header: SyncHeader, + pub content: SyncContent, +} + +/// A control message is a message intended for the scheduler that is executing +/// a component. +pub(crate) struct ControlMessageFancy { + pub id: u32, // generic identifier, used to match request to response + pub content: ControlContent, +} + +pub(crate) enum ControlContent { + PortPeerChanged(PortIdLocal, ConnectorId), + CloseChannel(PortIdLocal), + Ack, + Ping, +} + +/// Combination of data message and control messages. +pub(crate) enum MessageFancy { + Data(DataMessageFancy), + Sync(SyncMessageFancy), + Control(ControlMessageFancy), +} \ No newline at end of file diff --git a/src/runtime2/mod.rs b/src/runtime2/mod.rs index 1e64664226ec45f096d486297005f7df11d9f0eb..1fb96a52dc243346639b4e20266a1edb5f76d996 100644 --- a/src/runtime2/mod.rs +++ b/src/runtime2/mod.rs @@ -8,8 +8,11 @@ mod native; mod port; mod scheduler; mod inbox; +mod consensus; +mod inbox2; #[cfg(test)] mod tests; +mod connector2; // Imports diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index a17d66cc0b843cdbd460a5bc7518bd137b1a8efd..44e212f8d94a70c3bcd30bdd64c4cf168ea5433a 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; +use crate::runtime2::inbox2::{DataMessageFancy, MessageFancy}; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey, ConnectorVariant}; use super::port::{Port, PortState, PortIdLocal}; @@ -405,7 +406,7 @@ pub(crate) struct ComponentCtxFancy { // Mostly managed by the scheduler pub(crate) id: ConnectorId, ports: Vec, - inbox_messages: Vec, // never control or ping messages + inbox_messages: Vec, // never control or ping messages inbox_len_read: usize, // Submitted by the component is_in_sync: bool, @@ -499,19 +500,19 @@ impl ComponentCtxFancy { /// Retrieves messages matching a particular port and branch id. But only /// those messages that have been previously received with /// `read_next_message`. - pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal, match_prev_branch_id: BranchId) -> MessagesIter { + pub(crate) fn get_read_data_messages(&self, match_port_id: PortIdLocal) -> MessagesIter { return MessagesIter { messages: &self.inbox_messages, next_index: 0, max_index: self.inbox_len_read, - match_port_id, match_prev_branch_id + match_port_id }; } /// Retrieves the next unread message from the inbox `None` if there are no /// (new) messages to read. // TODO: Fix the clone of the data message, entirely unnecessary - pub(crate) fn read_next_message(&mut self) -> Option { + pub(crate) fn read_next_message(&mut self) -> Option { if !self.is_in_sync { return None; } if self.inbox_len_read == self.inbox_messages.len() { return None; } @@ -533,22 +534,21 @@ impl ComponentCtxFancy { } pub(crate) struct MessagesIter<'a> { - messages: &'a [Message], + messages: &'a [MessageFancy], next_index: usize, max_index: usize, match_port_id: PortIdLocal, - match_prev_branch_id: BranchId, } impl<'a> Iterator for MessagesIter<'a> { - type Item = &'a DataMessage; + type Item = &'a DataMessageFancy; fn next(&mut self) -> Option { // Loop until match is found or at end of messages while self.next_index < self.max_index { let message = &self.messages[self.next_index]; - if let MessageContents::Data(data_message) = &message.contents { - if message.receiving_port == self.match_port_id && data_message.sender_prev_branch_id == self.match_prev_branch_id { + if let MessageFancy::Data(message) = &message { + if message.data_header.target_port == self.match_port_id { // Found a match self.next_index += 1; return Some(data_message);