diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index 10a1ec47a20a917d4a33df1692de024d68190edc..7b70bab641d8ae67019d6bfeaf8abd8c1a65dec8 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -1,19 +1,18 @@ use std::collections::VecDeque; use std::sync::{Arc, Mutex, Condvar}; use std::sync::atomic::Ordering; +use std::collections::HashMap; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; -use crate::runtime2::branch::{FakeTree, QueueKind, SpeculativeState}; -use crate::runtime2::consensus::{Consensus, Consistency}; -use crate::runtime2::inbox::{DataContent, DataMessage, SyncMessage}; use super::{ConnectorKey, ConnectorId, RuntimeInner}; +use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; use super::scheduler::{SchedulerCtx, ComponentCtx}; use super::port::{Port, PortIdLocal, Channel, PortKind}; -use super::consensus::find_ports_in_value_group; +use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::connector::{ConnectorScheduling, ConnectorPDL}; -use super::inbox::{Message, ControlContent, ControlMessage}; +use super::inbox::{Message, DataContent, DataMessage, SyncMessage, ControlContent, ControlMessage}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -24,7 +23,12 @@ pub(crate) trait Connector { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling; } -type SyncDone = Arc<(Mutex, Condvar)>; +pub(crate) struct FinishedSync { + // In the order of the `get` calls + inbox: Vec, +} + +type SyncDone = Arc<(Mutex>, Condvar)>; type JobQueue = Arc>>; enum ApplicationJob { @@ -41,7 +45,9 @@ enum ApplicationJob { /// The connector which an application can directly interface with. Once may set /// up the next synchronous round, and retrieve the data afterwards. // TODO: Strong candidate for logic reduction in handling put/get. A lot of code -// is an approximate copy-pasta from the regular component logic. +// is an approximate copy-pasta from the regular component logic. I'm going to +// wait until I'm implementing more native components to see which logic is +// truly common. pub struct ConnectorApplication { // Communicating about new jobs and setting up sync rounds sync_done: SyncDone, @@ -49,15 +55,29 @@ pub struct ConnectorApplication { is_in_sync: bool, // Handling current sync round sync_desc: Vec, - exec_tree: FakeTree, + tree: FakeTree, consensus: Consensus, + last_finished_handled: Option, branch_extra: Vec, // instruction counter per branch } impl Connector for ConnectorApplication { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { if self.is_in_sync { - return self.run_in_sync_mode(sched_ctx, comp_ctx); + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + if let Some(solution_branch) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Can finish sync round immediately + self.collapse_sync_to_solution_branch(solution_branch, comp_ctx); + return ConnectorScheduling::Immediate; + } + } + + return scheduling; } else { return self.run_in_deterministic_mode(sched_ctx, comp_ctx); } @@ -66,7 +86,7 @@ impl Connector for ConnectorApplication { impl ConnectorApplication { pub(crate) fn new(runtime: Arc) -> (Self, ApplicationInterface) { - let sync_done = Arc::new(( Mutex::new(false), Condvar::new() )); + let sync_done = Arc::new(( Mutex::new(None), Condvar::new() )); let job_queue = Arc::new(Mutex::new(VecDeque::with_capacity(32))); let connector = ConnectorApplication { @@ -74,6 +94,10 @@ impl ConnectorApplication { job_queue: job_queue.clone(), is_in_sync: false, sync_desc: Vec::new(), + tree: FakeTree::new(), + consensus: Consensus::new(), + last_finished_handled: None, + branch_extra: Vec::new(), }; let interface = ApplicationInterface::new(sync_done, job_queue, runtime); @@ -90,31 +114,36 @@ impl ConnectorApplication { } } - pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { + pub(crate) fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - debug_assert!(ctx.workspace_branches.is_empty()); - let mut branches = Vec::new(); // TODO: @Remove - if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) { + if !self.consensus.handle_new_data_message(&message, ctx) { // Old message, so drop it return; } - for branch_id in branches.drain(..) { + let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + + let branch = &self.tree[branch_id]; + if branch.awaiting_port != message.data_header.target_port { continue; } + if !self.consensus.branch_can_receive(branch_id, &message) { continue; } + // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } - pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { + pub(crate) fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { self.collapse_sync_to_solution_branch(solution_branch_id, ctx); } @@ -125,13 +154,13 @@ impl ConnectorApplication { self.handle_new_messages(comp_ctx); - let branch_id = self.exec_tree.pop_from_queue(QueueKind::Runnable); + let branch_id = self.tree.pop_from_queue(QueueKind::Runnable); if branch_id.is_none() { return ConnectorScheduling::NotNow; } let branch_id = branch_id.unwrap(); - let branch = &mut self.exec_tree[branch_id]; + let branch = &mut self.tree[branch_id]; let mut instruction_idx = self.branch_extra[branch_id.index as usize]; if instruction_idx >= self.sync_desc.len() { @@ -140,7 +169,7 @@ impl ConnectorApplication { let consistency = self.consensus.notify_of_finished_branch(branch_id); if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::ReachedSyncEnd; - self.exec_tree.push_into_queue(QueueKind::FinishedSync, branch_id); + self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); } else { branch.sync_state = SpeculativeState::Inconsistent; } @@ -154,14 +183,14 @@ impl ConnectorApplication { let port_id = *port_id; let consistency = self.consensus.notify_of_speculative_mapping(branch_id, port_id, true); if consistency == Consistency::Valid { - let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, ctx); + let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); let message = Message::Data(DataMessage { sync_header, data_header, content: DataContent::Message(content.clone()), }); comp_ctx.submit_message(message); - self.exec_tree.push_into_queue(QueueKind::Runnable, branch_id); + self.tree.push_into_queue(QueueKind::Runnable, branch_id); return ConnectorScheduling::Immediate; } else { branch.sync_state = SpeculativeState::Inconsistent; @@ -173,23 +202,23 @@ impl ConnectorApplication { if consistency == Consistency::Valid { branch.sync_state = SpeculativeState::HaltedAtBranchPoint; branch.awaiting_port = port_id; - self.exec_tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); + self.tree.push_into_queue(QueueKind::AwaitingMessage, branch_id); let mut any_message_received = false; for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { + if self.consensus.branch_can_receive(branch_id, &message) { // This branch can receive the message, so we do the // fork-and-receive dance - let receiving_branch_id = self.exec_tree.fork_branch(branch_id); - let branch = &mut self.exec_tree[receiving_branch_id]; + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; debug_assert!(receiving_branch_id.index as usize == self.branch_extra.len()); self.branch_extra.push(instruction_idx + 1); branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); - self.exec_tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; } @@ -205,7 +234,7 @@ impl ConnectorApplication { } } - if self.exec_tree.queue_is_empty(QueueKind::Runnable) { + if self.tree.queue_is_empty(QueueKind::Runnable) { return ConnectorScheduling::NotNow; } else { return ConnectorScheduling::Later; @@ -230,18 +259,14 @@ impl ConnectorApplication { }, ApplicationJob::SyncRound(mut description) => { // Entering sync mode - if description.is_empty() { - // To simplify logic we always have one instruction - description.push(ApplicationSyncAction::Noop); - } - self.sync_desc = description; self.is_in_sync = true; + debug_assert!(self.last_finished_handled.is_none()); debug_assert!(self.branch_extra.is_empty()); - let first_branch_id = self.exec_tree.start_sync(); - self.exec_tree.push_into_queue(QueueKind::Runnable, first_branch_id); - self.consensus.start_sync(ctx); + let first_branch_id = self.tree.start_sync(); + self.tree.push_into_queue(QueueKind::Runnable, first_branch_id); + self.consensus.start_sync(comp_ctx); self.branch_extra.push(0); // set first branch to first instruction return ConnectorScheduling::Immediate; @@ -255,6 +280,43 @@ impl ConnectorApplication { return ConnectorScheduling::NotNow; } + + fn collapse_sync_to_solution_branch(&mut self, branch_id: BranchId, comp_ctx: &mut ComponentCtx) { + debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program + // Notifying tree, consensus algorithm and context of ending sync + let mut fake_vec = Vec::new(); + let mut solution_branch = self.tree.end_sync(branch_id); + self.consensus.end_sync(branch_id, &mut fake_vec); + + for port in fake_vec { + debug_assert!(comp_ctx.get_port_by_id(port).is_some()); + } + + comp_ctx.notify_sync_end(&[]); + + // Turning hashmapped inbox into vector of values + let mut inbox = Vec::with_capacity(solution_branch.inbox.len()); + for action in &self.sync_desc { + match action { + ApplicationSyncAction::Put(_, _) => {}, + ApplicationSyncAction::Get(port_id) => { + debug_assert!(solution_branch.inbox.contains_key(port_id)); + inbox.push(solution_branch.inbox.remove(port_id).unwrap()); + }, + } + } + + // Notifying interface of ending sync + self.is_in_sync = false; + self.sync_desc.clear(); + self.branch_extra.clear(); + self.last_finished_handled = None; + + let (results, notification) = &*self.sync_done; + let mut results = results.lock().unwrap(); + *results = Some(FinishedSync{ inbox }); + notification.notify_one(); + } } // ----------------------------------------------------------------------------- @@ -345,14 +407,14 @@ impl ApplicationInterface { let mut initial_ports = Vec::new(); find_ports_in_value_group(&arguments, &mut initial_ports); for initial_port in &initial_ports { - if !self.owned_ports.iter().any(|v| v == initial_port) { + if !self.owned_ports.iter().any(|(_, v)| v == initial_port) { return Err(ComponentCreationError::UnownedPort); } } // We own all ports, so remove them on this side for initial_port in &initial_ports { - let position = self.owned_ports.iter().position(|v| v == initial_port).unwrap(); + let position = self.owned_ports.iter().position(|(_, v)| v == initial_port).unwrap(); self.owned_ports.remove(position); } @@ -404,7 +466,7 @@ impl ApplicationInterface { { let (is_done, _) = &*self.sync_done; let mut lock = is_done.lock().unwrap(); - *lock = false; + *lock = None; } { @@ -423,8 +485,10 @@ impl ApplicationInterface { } let (is_done, condition) = &*self.sync_done; - let lock = is_done.lock().unwrap(); - condition.wait_while(lock, |v| !*v).unwrap(); // wait while not done + let mut lock = is_done.lock().unwrap(); + lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done + + return Ok(lock.take().unwrap().inbox); } /// Called by runtime to set associated connector's ID.