From 252d005a21e30f73e7101e3aeb61e392f70efaa9 2021-11-12 12:39:10 From: MH Date: 2021-11-12 12:39:10 Subject: [PATCH] initial API implementation --- diff --git a/src/runtime2/branch.rs b/src/runtime2/branch.rs index 465d52f779922ffc6ca94a0c13980c9317ef4f95..a2a9221a466b21ce5b0dba5fcc3ddd14149c17b2 100644 --- a/src/runtime2/branch.rs +++ b/src/runtime2/branch.rs @@ -8,9 +8,9 @@ use super::port::PortIdLocal; // To share some logic between the FakeTree and ExecTree implementation trait BranchListItem { - #[inline] fn get_id(&self) -> BranchId; - #[inline] fn set_next_id(&mut self, id: BranchId); - #[inline] fn get_next_id(&self) -> BranchId; + fn get_id(&self) -> BranchId; + fn set_next_id(&mut self, id: BranchId); + fn get_next_id(&self) -> BranchId; } /// Generic branch ID. A component will always have one branch: the @@ -144,7 +144,7 @@ impl BranchQueue { const NUM_QUEUES: usize = 3; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) enum QueueKind { Runnable, AwaitingMessage, @@ -219,16 +219,25 @@ impl ExecTree { return &mut self.branches[0]; } - /// Returns an iterator over all the elements in the queue of the given - /// kind. One can start the iteration at the branch *after* the provided - /// branch. Just make sure it actually is in the provided queue. - pub fn iter_queue(&self, kind: QueueKind, start_after: Option) -> BranchQueueIter<'_, Branch> { - // Make sure branch is in correct queue while in debug mode - debug_assert!(start_after - .map(|branch_id| self.iter_queue(kind, None).any(|v| v.id == branch_id)) - .unwrap_or(true)); + /// Returns the branch ID of the first branch in a particular queue. + pub fn get_queue_first(&self, kind: QueueKind) -> Option { let queue = &self.queues[kind.as_index()]; - return iter_queue(queue, &self.branches, start_after); + if queue.first.is_valid() { + return Some(queue.first); + } else { + return None; + } + } + + /// Returns the next branch ID of a branch (assumed to be in a particular + /// queue. + pub fn get_queue_next(&self, branch_id: BranchId) -> Option { + let branch = &self.branches[branch_id.index as usize]; + if branch.next_in_queue.is_valid() { + return Some(branch.next_in_queue); + } else { + return None; + } } /// Returns an iterator that starts with the provided branch, and then @@ -270,7 +279,6 @@ impl ExecTree { /// using the provided branch as the final sync result. pub fn end_sync(&mut self, branch_id: BranchId) { debug_assert!(self.is_in_sync()); - debug_assert!(self.iter_queue(QueueKind::FinishedSync, None).any(|v| v.id == branch_id)); // Swap indicated branch into the first position self.branches.swap(0, branch_id.index as usize); @@ -309,27 +317,6 @@ impl IndexMut for ExecTree { } } -/// Iterator over branches in a `ExecTree` queue. -pub(crate) struct BranchQueueIter<'a, B: BranchListItem> { - branches: &'a [B], - index: usize, -} - -impl<'a, B: BranchListItem> Iterator for BranchQueueIter<'a, B> { - type Item = &'a B; - - fn next(&mut self) -> Option { - if self.index == 0 { - // i.e. the invalid branch index - return None; - } - - let branch = &self.branches[self.index]; - self.index = branch.get_next_id().index as usize; - return Some(branch); - } -} - /// Iterator over the parents of an `ExecTree` branch. pub(crate) struct BranchParentIter<'a> { branches: &'a [Branch], @@ -374,10 +361,10 @@ impl BranchListItem for FakeBranch { } impl FakeBranch { - fn new_root(index: u32) -> FakeBranch { - debug_assert!(index == 0); + fn new_root(_index: u32) -> FakeBranch { + debug_assert!(_index == 1); return FakeBranch{ - id: BranchId::new_invalid(), + id: BranchId::new(1), parent_id: BranchId::new_invalid(), sync_state: SpeculativeState::RunningInSync, awaiting_port: PortIdLocal::new_invalid(), @@ -415,8 +402,21 @@ pub(crate) struct FakeTree { impl FakeTree { pub fn new() -> Self { + // TODO: Don't like this? Cause is that now we don't have a non-sync + // branch. But we assumed BranchId=0 means the branch is invalid. We + // can do the rusty Option stuff. But we still need a token + // value within the protocol to signify no-branch-id. Maybe the high + // bit? Branches are crazy expensive, no-one is going to have 2^32 + // branches anyway. 2^31 isn't too bad. return Self { - branches: Vec::new(), + branches: vec![FakeBranch{ + id: BranchId::new_invalid(), + parent_id: BranchId::new_invalid(), + sync_state: SpeculativeState::RunningNonSync, + awaiting_port: PortIdLocal::new_invalid(), + next_in_queue: BranchId::new_invalid(), + inbox: HashMap::new(), + }], queues: [BranchQueue::new(); 3] } } @@ -438,12 +438,22 @@ impl FakeTree { push_into_queue(&mut self.queues[kind.as_index()], &mut self.branches, id); } - pub fn iter_queue(&self, kind: QueueKind, start_after: Option) -> BranchQueueIter<'_, FakeBranch> { - debug_assert!(start_after - .map(|branch_id| self.iter_queue(kind, None).any(|v| v.id == branch_id)) - .unwrap_or(true) - ); - return iter_queue(&self.queues[kind.as_index()], &self.branches, start_after); + pub fn get_queue_first(&self, kind: QueueKind) -> Option { + let queue = &self.queues[kind.as_index()]; + if queue.first.is_valid() { + return Some(queue.first) + } else { + return None; + } + } + + pub fn get_queue_next(&self, branch_id: BranchId) -> Option { + let branch = &self.branches[branch_id.index as usize]; + if branch.next_in_queue.is_valid() { + return Some(branch.next_in_queue); + } else { + return None; + } } pub fn start_sync(&mut self) -> BranchId { @@ -468,12 +478,13 @@ impl FakeTree { } pub fn end_sync(&mut self, branch_id: BranchId) -> FakeBranch { + debug_assert!(branch_id.is_valid()); debug_assert!(self.is_in_sync()); - debug_assert!(self.iter_queue(QueueKind::FinishedSync, None).any(|v| v.id == BranchId)); // Take out the succeeding branch, then just clear all fake branches. - let mut iter = self.branches.drain(branch_id.index..); - let result = iter.next().unwrap(); + self.branches.swap(1, branch_id.index as usize); + self.branches.truncate(2); + let result = self.branches.pop().unwrap(); for queue_index in 0..NUM_QUEUES { self.queues[queue_index] = BranchQueue::new(); @@ -522,23 +533,8 @@ fn push_into_queue(queue: &mut BranchQueue, branches: &mut [B queue.first = branch_id; queue.last = branch_id; } else { - let last_branch = &mut branches[queue.last as usize]; + let last_branch = &mut branches[queue.last.index as usize]; last_branch.set_next_id(branch_id); queue.last = branch_id; } -} - -fn iter_queue<'a, B: BranchListItem>(queue: &BranchQueue, branches: &'a [B], start_after: Option) -> BranchQueueIter<'a, B> { - let index = match start_after { - Some(branch_id) => { - // Assuming caller is correct and that the branch is in the queue - let first_branch = &branches[branch_id.index as usize]; - first_branch.get_next_id().index as usize - }, - None => { - queue.first.index as usize - } - }; - - return BranchQueueIter{ branches, index }; } \ No newline at end of file diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index bdda7503cfdd8efaac4edfa57a0e2365ff67c1cc..28756d719a417835b7988cfe2b8e6b33cd994fa1 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -66,6 +66,7 @@ pub(crate) enum ConnectorScheduling { pub(crate) struct ConnectorPDL { tree: ExecTree, consensus: Consensus, + last_finished_handled: Option, } struct ConnectorRunContext<'a> { @@ -106,13 +107,26 @@ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); if self.tree.is_in_sync() { + // Run in sync mode let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; - } else { - return scheduling + + // Handle any new finished branches + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Actually found a solution + self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } + + self.last_finished_handled = Some(branch_id); } + + return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; @@ -125,6 +139,7 @@ impl ConnectorPDL { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), + last_finished_handled: None, } } @@ -143,21 +158,26 @@ impl ConnectorPDL { pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - debug_assert!(ctx.workspace_branches.is_empty()); - let mut branches = Vec::new(); // TODO: @Remove - if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) { + if !self.consensus.handle_new_data_message(&message, ctx) { // Old message, so drop it return; } - for branch_id in branches.drain(..) { + let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + + let branch = &self.tree[branch_id]; + if branch.awaiting_port != message.data_header.target_port { continue; } + if !self.consensus.branch_can_receive(branch_id, &message) { continue; } + // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -241,7 +261,7 @@ impl ConnectorPDL { // a message that targets this branch, so check now. let mut any_message_received = false; for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { + if self.consensus.branch_can_receive(branch_id, &message) { // This branch can receive the message, so we do the // fork-and-receive dance let receiving_branch_id = self.tree.fork_branch(branch_id); @@ -250,7 +270,7 @@ impl ConnectorPDL { branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; @@ -327,6 +347,7 @@ impl ConnectorPDL { RunResult::ComponentAtSyncStart => { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); + debug_assert!(self.last_finished_handled.is_none()); self.consensus.start_sync(comp_ctx); self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); @@ -381,5 +402,6 @@ impl ConnectorPDL { } ctx.notify_sync_end(&[]); + self.last_finished_handled = None; } } \ No newline at end of file diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index f69f1392977a41b4413618fefe4e1b13441230ea..91764ee546a1731534007775d44156f10b8958f7 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -1,10 +1,9 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; -use crate::runtime2::branch::BranchQueueIter; -use super::branch::{BranchId, ExecTree, QueueKind}; use super::ConnectorId; +use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ Message, PortAnnotation, @@ -49,13 +48,13 @@ struct Peer { // TODO: Have a "branch+port position hint" in case multiple operations are // performed on the same port to prevent repeated lookups // TODO: A lot of stuff should be batched. Like checking all the sync headers -// and sending "I have a higher ID" messages. +// and sending "I have a higher ID" messages. Should reduce locking by quite a +// bit. pub(crate) struct Consensus { // --- State that is cleared after each round // Local component's state highest_connector_id: ConnectorId, branch_annotations: Vec, - last_finished_handled: Option, // Gathered state from communication encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, @@ -77,7 +76,6 @@ impl Consensus { return Self { highest_connector_id: ConnectorId::new_invalid(), branch_annotations: Vec::new(), - last_finished_handled: None, encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), peers: Vec::new(), @@ -106,7 +104,6 @@ impl Consensus { pub fn start_sync(&mut self, ctx: &ComponentCtx) { debug_assert!(!self.highest_connector_id.is_valid()); debug_assert!(self.branch_annotations.is_empty()); - debug_assert!(self.last_finished_handled.is_none()); debug_assert!(self.solution_combiner.local.is_empty()); // We'll use the first "branch" (the non-sync one) to store our ports, @@ -188,68 +185,59 @@ impl Consensus { unreachable!("notify_of_speculative_mapping called with unowned port"); } - /// Generates sync messages for any branches that are at the end of the - /// sync block. To find these branches, they should've been put in the - /// "finished" queue in the execution tree. - pub fn handle_new_finished_sync_branches(&mut self, tree: &ExecTree, ctx: &mut ComponentCtx) -> Option { - debug_assert!(self.is_in_sync()); - - let mut last_branch_id = self.last_finished_handled; - for branch in tree.iter_queue(QueueKind::FinishedSync, last_branch_id) { - // Turn the port mapping into a local solution - let source_mapping = &self.branch_annotations[branch.id.index as usize].port_mapping; - let mut target_mapping = Vec::with_capacity(source_mapping.len()); - - for port in source_mapping { - // Note: if the port is silent, and we've never communicated - // over the port, then we need to do so now, to let the peer - // component know about our sync leader state. - let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); - let peer_port_id = port_desc.peer_id; - let channel_id = port_desc.channel_id; - - if !self.encountered_ports.contains(&port.port_id) { - ctx.submit_message(Message::Data(DataMessage { - sync_header: SyncHeader{ - sending_component_id: ctx.id, - highest_component_id: self.highest_connector_id, - sync_round: self.sync_round - }, - data_header: DataHeader{ - expected_mapping: source_mapping.clone(), - sending_port: port.port_id, - target_port: peer_port_id, - new_mapping: BranchId::new_invalid(), - }, - content: DataContent::SilentPortNotification, - })); - self.encountered_ports.push(port.port_id); - } - - target_mapping.push(( - channel_id, - port.registered_id.unwrap_or(BranchId::new_invalid()) - )); - } - - let local_solution = LocalSolution{ - component: ctx.id, - final_branch_id: branch.id, - port_mapping: target_mapping, - }; - let solution_branch = self.send_or_store_local_solution(local_solution, ctx); - if solution_branch.is_some() { - // No need to continue iterating, we've found the solution - return solution_branch; + /// Generates a new local solution from a finished branch. If the component + /// is not the leader of the sync region then it will be sent to the + /// appropriate component. If it is the leader then there is a chance that + /// this solution completes a global solution. In that case the solution + /// branch ID will be returned. + pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { + // Turn the port mapping into a local solution + let source_mapping = &self.branch_annotations[branch_id.index as usize].port_mapping; + let mut target_mapping = Vec::with_capacity(source_mapping.len()); + + for port in source_mapping { + // Note: if the port is silent, and we've never communicated + // over the port, then we need to do so now, to let the peer + // component know about our sync leader state. + let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); + let peer_port_id = port_desc.peer_id; + let channel_id = port_desc.channel_id; + + if !self.encountered_ports.contains(&port.port_id) { + ctx.submit_message(Message::Data(DataMessage { + sync_header: SyncHeader{ + sending_component_id: ctx.id, + highest_component_id: self.highest_connector_id, + sync_round: self.sync_round + }, + data_header: DataHeader{ + expected_mapping: source_mapping.clone(), + sending_port: port.port_id, + target_port: peer_port_id, + new_mapping: BranchId::new_invalid(), + }, + content: DataContent::SilentPortNotification, + })); + self.encountered_ports.push(port.port_id); } - last_branch_id = Some(branch.id); + target_mapping.push(( + channel_id, + port.registered_id.unwrap_or(BranchId::new_invalid()) + )); } - self.last_finished_handled = last_branch_id; - return None; + let local_solution = LocalSolution{ + component: ctx.id, + final_branch_id: branch_id, + port_mapping: target_mapping, + }; + let solution_branch = self.send_or_store_local_solution(local_solution, ctx); + return solution_branch; } + /// Notifies the consensus algorithm about the chosen branch to commit to + /// memory. pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { debug_assert!(self.is_in_sync()); @@ -264,7 +252,6 @@ impl Consensus { // Clear out internal storage to defaults self.highest_connector_id = ConnectorId::new_invalid(); self.branch_annotations.clear(); - self.last_finished_handled = None; self.encountered_ports.clear(); self.solution_combiner.clear(); @@ -325,15 +312,10 @@ impl Consensus { return (self.create_sync_header(ctx), data_header); } - /// Handles a new data message by handling the data and sync header, and - /// checking which *existing* branches *can* receive the message. So two - /// cautionary notes: - /// 1. A future branch might also be able to receive this message, see the - /// `branch_can_receive` function. - /// 2. We return the branches that *can* receive the message, you still - /// have to explicitly call `notify_of_received_message`. - pub fn handle_new_data_message(&mut self, potential_receivers: BranchQueueIter<'_, >, message: &DataMessage, ctx: &mut ComponentCtx, target_ids: &mut Vec) -> bool { - self.handle_received_data_header(exec_tree, &message.sync_header, &message.data_header, &message.content, target_ids); + /// Handles a new data message by handling the sync header. The caller is + /// responsible for checking for branches that might be able to receive + /// the message. + pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool { return self.handle_received_sync_header(&message.sync_header, ctx) } @@ -367,18 +349,18 @@ impl Consensus { } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) { - debug_assert!(self.branch_can_receive(branch_id, sync_header, data_header, content)); + pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage) { + debug_assert!(self.branch_can_receive(branch_id, message)); let branch = &mut self.branch_annotations[branch_id.index as usize]; for mapping in &mut branch.port_mapping { - if mapping.port_id == data_header.target_port { + if mapping.port_id == message.data_header.target_port { // Found the port in which the message should be inserted - mapping.registered_id = Some(data_header.new_mapping); + mapping.registered_id = Some(message.data_header.new_mapping); // Check for sent ports debug_assert!(self.workspace_ports.is_empty()); - find_ports_in_value_group(content.as_message().unwrap(), &mut self.workspace_ports); + find_ports_in_value_group(message.content.as_message().unwrap(), &mut self.workspace_ports); if !self.workspace_ports.is_empty() { todo!("handle received ports"); self.workspace_ports.clear(); @@ -395,20 +377,20 @@ impl Consensus { /// Matches the mapping between the branch and the data message. If they /// match then the branch can receive the message. - pub fn branch_can_receive(&self, branch_id: BranchId, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent) -> bool { - if let Some(peer) = self.peers.iter().find(|v| v.id == sync_header.sending_component_id) { - if sync_header.sync_round < peer.expected_sync_round { + pub fn branch_can_receive(&self, branch_id: BranchId, message: &DataMessage) -> bool { + if let Some(peer) = self.peers.iter().find(|v| v.id == message.sync_header.sending_component_id) { + if message.sync_header.sync_round < peer.expected_sync_round { return false; } } - if let DataContent::SilentPortNotification = content { + if let DataContent::SilentPortNotification = message.content { // No port can receive a "silent" notification. return false; } let annotation = &self.branch_annotations[branch_id.index as usize]; - for expected in &data_header.expected_mapping { + for expected in &message.data_header.expected_mapping { // If we own the port, then we have an entry in the // annotation, check if the current mapping matches for current in &annotation.port_mapping { @@ -427,21 +409,6 @@ impl Consensus { // --- Internal helpers - /// Checks data header and consults the stored port mapping and the - /// execution tree to see which branches may receive the data message's - /// contents. - fn handle_received_data_header(&self, exec_tree: &ExecTree, sync_header: &SyncHeader, data_header: &DataHeader, content: &DataContent, target_ids: &mut Vec) { - for branch in exec_tree.iter_queue(QueueKind::AwaitingMessage, None) { - if branch.awaiting_port == data_header.target_port { - // Found a branch awaiting the message, but we need to make sure - // the mapping is correct - if self.branch_can_receive(branch.id, sync_header, data_header, content) { - target_ids.push(branch.id); - } - } - } - } - fn handle_received_sync_header(&mut self, sync_header: &SyncHeader, ctx: &mut ComponentCtx) -> bool { debug_assert!(sync_header.sending_component_id != ctx.id); // not sending to ourselves if !self.handle_peer(sync_header) { diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 10a1ec47a20a917d4a33df1692de024d68190edc..7b70bab641d8ae67019d6bfeaf8abd8c1a65dec8 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -1,19 +1,18 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::Ordering; +use std::collections::HashMap; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; -use crate::runtime2::branch::{FakeTree, QueueKind, SpeculativeState}; -use crate::runtime2::consensus::{Consensus, Consistency}; -use crate::runtime2::inbox::{DataContent, DataMessage, SyncMessage}; use super::{ConnectorKey, ConnectorId, RuntimeInner}; +use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; use super::scheduler::{SchedulerCtx, ComponentCtx}; use super::port::{Port, PortIdLocal, Channel, PortKind}; -use super::consensus::find_ports_in_value_group; +use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::connector::{ConnectorScheduling, ConnectorPDL}; -use super::inbox::{Message, ControlContent, ControlMessage}; +use super::inbox::{Message, DataContent, DataMessage, SyncMessage, ControlContent, ControlMessage}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -24,7 +23,12 @@ pub(crate) trait Connector { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling; } -type SyncDone = Arc<(Mutex, Condvar)>; +pub(crate) struct FinishedSync { + // In the order of the `get` calls + inbox: Vec, +} + +type SyncDone = Arc<(Mutex>, Condvar)>; type JobQueue = Arc>>; enum ApplicationJob { @@ -41,7 +45,9 @@ enum ApplicationJob { /// The connector which an application can directly interface with. Once may set /// up the next synchronous round, and retrieve the data afterwards. // TODO: Strong candidate for logic reduction in handling put/get. A lot of code -// is an approximate copy-pasta from the regular component logic. +// is an approximate copy-pasta from the regular component logic. I'm going to +// wait until I'm implementing more native components to see which logic is +// truly common. pub struct ConnectorApplication { // Communicating about new jobs and setting up sync rounds sync_done: SyncDone, @@ -49,15 +55,29 @@ pub struct ConnectorApplication { is_in_sync: bool, // Handling current sync round sync_desc: Vec, - exec_tree: FakeTree, + tree: FakeTree, consensus: Consensus, + last_finished_handled: Option, branch_extra: Vec, // instruction counter per branch } impl Connector for ConnectorApplication { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { if self.is_in_sync { - return self.run_in_sync_mode(sched_ctx, comp_ctx); + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + if let Some(solution_branch) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Can finish sync round immediately + self.collapse_sync_to_solution_branch(solution_branch, comp_ctx); + return ConnectorScheduling::Immediate; + } + } + + return scheduling; } else { return self.run_in_deterministic_mode(sched_ctx, comp_ctx); } @@ -66,7 +86,7 @@ impl Connector for ConnectorApplication { impl ConnectorApplication { pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { - let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); + let sync_done = Arc::new(( Mutex::new(None), Condvar::new() )); let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); let connector = ConnectorApplication { @@ -74,6 +94,10 @@ impl ConnectorApplication { job_queue: job_queue.clone(), is_in_sync: false, sync_desc: Vec::new(), + tree: FakeTree::new(), + consensus: Consensus::new(), + last_finished_handled: None, + branch_extra: Vec::new(), }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); @@ -90,31 +114,36 @@ impl ConnectorApplication { } } - pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - debug_assert!(ctx.workspace_branches.is_empty()); - let mut branches = Vec::new(); // TODO: @Remove - if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) { + if !self.consensus.handle_new_data_message(&message, ctx) { // Old message, so drop it return; } - for branch_id in branches.drain(..) { + let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + + let branch = &self.tree[branch_id]; + if branch.awaiting_port != message.data_header.target_port { continue; } + if !self.consensus.branch_can_receive(branch_id, &message) { continue; } + // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } - pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { + pub(crate) fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { self.collapse_sync_to_solution_branch(solution_branch_id, ctx); } @@ -125,13 +154,13 @@ impl ConnectorApplication { self.handle_new_messages(comp_ctx); - let branch_id = self.exec_tree.pop_from_queue(QueueKind::Runnable); + let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); if branch_id.is_none() { return ConnectorScheduling::NotNow; } let branch_id = branch_id.unwrap(); - let branch = &mut self.exec_tree[branch_id]; + let branch = &mut self.tree[branch_id]; let mut instruction_idx = self.branch_extra[branch_id.index as usize]; if instruction_idx >= self.sync_desc.len() { @@ -140,7 +169,7 @@ impl ConnectorApplication { let consistency = self.consensus.notify_of_finished_branch(branch_id); if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; - self.exec_tree.push_into_queue(QueueKind::FinishedSync, branch_id); + self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); } else { branch.sync_state = SpeculativeState::Inconsistent; } @@ -154,14 +183,14 @@ impl ConnectorApplication { let port_id = *port_id; let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, ctx); + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); let message = Message::Data(DataMessage { sync_header, data_header, content: DataContent::Message(content.clone()), }); comp_ctx.submit_message(message); - self.exec_tree.push_into_queue(QueueKind::Runnable, branch_id); + self.tree.push_into_queue(QueueKind::Runnable, branch_id); return ConnectorScheduling::Immediate; } else { branch.sync_state = SpeculativeState::Inconsistent; @@ -173,23 +202,23 @@ impl ConnectorApplication { if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::HaltedAtBranchPoint; branch.awaiting_port = port_id; - self.exec_tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); let mut any_message_received = false; for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { + if self.consensus.branch_can_receive(branch_id, &message) { // This branch can receive the message, so we do the // fork-and-receive dance - let receiving_branch_id = self.exec_tree.fork_branch(branch_id); - let branch = &mut self.exec_tree[receiving_branch_id]; + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); self.branch_extra.push(instruction_idx + 1); branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); - self.exec_tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; } @@ -205,7 +234,7 @@ impl ConnectorApplication { } } - if self.exec_tree.queue_is_empty(QueueKind::Runnable) { + if self.tree.queue_is_empty(QueueKind::Runnable) { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; @@ -230,18 +259,14 @@ impl ConnectorApplication { }, ApplicationJob::SyncRound(mut description) => { // Entering sync mode - if description.is_empty() { - // To simplify logic we always have one instruction - description.push(ApplicationSyncAction::Noop); - } - self.sync_desc = description; self.is_in_sync = true; + debug_assert!(self.last_finished_handled.is_none()); debug_assert!(self.branch_extra.is_empty()); - let first_branch_id = self.exec_tree.start_sync(); - self.exec_tree.push_into_queue(QueueKind::Runnable, first_branch_id); - self.consensus.start_sync(ctx); + let first_branch_id = self.tree.start_sync(); + self.tree.push_into_queue(QueueKind::Runnable, first_branch_id); + self.consensus.start_sync(comp_ctx); self.branch_extra.push(0); // set first branch to first instruction return ConnectorScheduling::Immediate; @@ -255,6 +280,43 @@ impl ConnectorApplication { return ConnectorScheduling::NotNow; } + + fn collapse_sync_to_solution_branch(&mut self, branch_id: BranchId, comp_ctx: &mut ComponentCtx) { + debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program + // Notifying tree, consensus algorithm and context of ending sync + let mut fake_vec = Vec::new(); + let mut solution_branch = self.tree.end_sync(branch_id); + self.consensus.end_sync(branch_id, &mut fake_vec); + + for port in fake_vec { + debug_assert!(comp_ctx.get_port_by_id(port).is_some()); + } + + comp_ctx.notify_sync_end(&[]); + + // Turning hashmapped inbox into vector of values + let mut inbox = Vec::with_capacity(solution_branch.inbox.len()); + for action in &self.sync_desc { + match action { + ApplicationSyncAction::Put(_, _) => {}, + ApplicationSyncAction::Get(port_id) => { + debug_assert!(solution_branch.inbox.contains_key(port_id)); + inbox.push(solution_branch.inbox.remove(port_id).unwrap()); + }, + } + } + + // Notifying interface of ending sync + self.is_in_sync = false; + self.sync_desc.clear(); + self.branch_extra.clear(); + self.last_finished_handled = None; + + let (results, notification) = &*self.sync_done; + let mut results = results.lock().unwrap(); + *results = Some(FinishedSync{ inbox }); + notification.notify_one(); + } } // ----------------------------------------------------------------------------- @@ -345,14 +407,14 @@ impl ApplicationInterface { let mut initial_ports = Vec::new(); find_ports_in_value_group(&arguments, &mut initial_ports); for initial_port in &initial_ports { - if !self.owned_ports.iter().any(|v| v == initial_port) { + if !self.owned_ports.iter().any(|(_, v)| v == initial_port) { return Err(ComponentCreationError::UnownedPort); } } // We own all ports, so remove them on this side for initial_port in &initial_ports { - let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap(); + let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap(); self.owned_ports.remove(position); } @@ -404,7 +466,7 @@ impl ApplicationInterface { { let (is_done, _) = &*self.sync_done; let mut lock = is_done.lock().unwrap(); - *lock = false; + *lock = None; } { @@ -423,8 +485,10 @@ impl ApplicationInterface { } let (is_done, condition) = &*self.sync_done; - let lock = is_done.lock().unwrap(); - condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done + let mut lock = is_done.lock().unwrap(); + lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done + + return Ok(lock.take().unwrap().inbox); } /// Called by runtime to set associated connector's ID. diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index a5c2aac79bf56dedc089cee2118064d8f49f87c7..c3ac4c825bd954e54e1c2ab043465a0fb8f064a5 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -1,3 +1,5 @@ +mod network_shapes; + use super::*; use crate::{PortId, ProtocolDescription}; use crate::common::Id; @@ -75,7 +77,7 @@ fn test_put_and_get() { let thing = TestTimer::new("put_and_get"); run_test_in_runtime(CODE, |api| { - let channel = api.create_channel(); + let channel = api.create_channel().unwrap(); api.create_connector("", "putter", ValueGroup::new_stack(vec![ Value::Output(PortId(Id{ connector_id: 0, u32_suffix: channel.putter_id.index })), @@ -150,7 +152,7 @@ fn test_star_shaped_request() { api.create_connector("", "constructor", ValueGroup::new_stack(vec![ Value::UInt32(5), Value::UInt32(NUM_LOOPS), - ])); + ])).expect("create connector"); }); } @@ -227,6 +229,6 @@ fn test_conga_line_request() { api.create_connector("", "constructor", ValueGroup::new_stack(vec![ Value::UInt32(5), Value::UInt32(NUM_LOOPS) - ])); + ])).expect("create connector"); }); } \ No newline at end of file diff --git a/src/runtime2/tests/network_shapes.rs b/src/runtime2/tests/network_shapes.rs new file mode 100644 index 0000000000000000000000000000000000000000..850d3c3de768c8dc58bcca520a8e0afbafa96ea4 --- /dev/null +++ b/src/runtime2/tests/network_shapes.rs @@ -0,0 +1 @@ +// Testing particular graph shapes \ No newline at end of file