From 1677e0c9568d3a17eabc44d6156dd8fa744760f5 2021-11-19 18:27:39 From: MH Date: 2021-11-19 18:27:39 Subject: [PATCH] Halfway implementing failure, fixing bug involving wrong mapping --- diff --git a/src/collections/sets.rs b/src/collections/sets.rs index ee00c2983f8283ab04a7a4d5b75f277c238c7f87..a93d6f8df3c732fd08608df15a18535fe90e6ab2 100644 --- a/src/collections/sets.rs +++ b/src/collections/sets.rs @@ -72,15 +72,18 @@ impl VecSet { self.inner.pop() } + /// Pushes a new element into the set. Returns `false` if it was already + /// present and `true` if it is newly added. #[inline] - pub fn push(&mut self, to_push: T) { + pub fn push(&mut self, to_push: T) -> bool { for element in self.inner.iter() { if *element == to_push { - return; + return false; } } self.inner.push(to_push); + return true } #[inline] diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index c08dcdce0381a2914611334344291af94e995588..f4b4e6bce2b76eda2f3da9ed9c8c1b3ef09b2666 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -34,10 +34,12 @@ use crate::common::ComponentState; use crate::protocol::eval::{EvalContinuation, EvalError, Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; use crate::runtime2::branch::PreparedStatement; +use crate::runtime2::consensus::RoundConclusion; +use crate::runtime2::inbox::SyncPortMessage; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; -use super::inbox::{DataMessage, DataContent, Message, SyncMessage, PublicInbox}; +use super::inbox::{DataMessage, DataContent, Message, SyncCompMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; use super::scheduler::{ComponentCtx, SchedulerCtx}; @@ -56,7 +58,7 @@ impl ConnectorPublic { } } -#[derive(Debug, PartialEq, Eq, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy)] enum Mode { NonSync, // running non-sync code Sync, // running sync code (in potentially multiple branches) @@ -106,6 +108,7 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ } fn fires(&mut self, port: PortId) -> Option { + todo!("Remove fires() now"); let port_id = PortIdLocal::new(port.0.u32_suffix); let annotation = self.consensus.get_annotation(self.branch_id, port_id); return annotation.expected_firing.map(|v| Value::Bool(v)); @@ -130,7 +133,10 @@ impl<'a> RunContext for ConnectorRunContext<'a>{ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { - self.handle_new_messages(comp_ctx); + if let Some(scheduling) = self.handle_new_messages(comp_ctx) { + return scheduling; + } + match self.mode { Mode::Sync => { // Run in sync mode @@ -142,10 +148,9 @@ impl Connector for ConnectorPDL { iter_id = self.tree.get_queue_next(branch_id); self.last_finished_handled = Some(branch_id); - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + if let Some(round_conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { // Actually found a solution - self.enter_non_sync_mode(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; + return self.enter_non_sync_mode(round_conclusion, comp_ctx); } self.last_finished_handled = Some(branch_id); @@ -183,14 +188,19 @@ impl ConnectorPDL { // --- Handling messages - pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) { + pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtx) -> Option { while let Some(message) = ctx.read_next_message() { match message { Message::Data(message) => self.handle_new_data_message(message, ctx), - Message::Sync(message) => self.handle_new_sync_message(message, ctx), + Message::SyncComp(message) => { + return self.handle_new_sync_comp_message(message, ctx) + }, + Message::SyncPort(message) => self.handle_new_sync_port_message(message, ctx), Message::Control(_) => unreachable!("control message in component"), } } + + return None; } pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { @@ -211,23 +221,30 @@ impl ConnectorPDL { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); + println!("DEBUG: ### Branching due to new data message"); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; debug_assert!(receiving_branch.awaiting_port == message.data_header.target_port); receiving_branch.awaiting_port = PortIdLocal::new_invalid(); receiving_branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } - pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { - if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - self.enter_non_sync_mode(solution_branch_id, ctx); + pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { + if let Some(round_conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) { + return Some(self.enter_non_sync_mode(round_conclusion, ctx)); } + + return None; + } + + pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { + self.consensus.handle_new_sync_port_message(message, ctx); } // --- Running code @@ -275,10 +292,10 @@ impl ConnectorPDL { let firing_branch_id = self.tree.fork_branch(branch_id); let silent_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, firing_branch_id); - let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true); + let _result = self.consensus.notify_of_speculative_mapping(firing_branch_id, port_id, true, comp_ctx); debug_assert_eq!(_result, Consistency::Valid); self.consensus.notify_of_new_branch(branch_id, silent_branch_id); - let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false); + let _result = self.consensus.notify_of_speculative_mapping(silent_branch_id, port_id, false, comp_ctx); debug_assert_eq!(_result, Consistency::Valid); // Somewhat important: we push the firing one first, such that @@ -310,8 +327,9 @@ impl ConnectorPDL { branch.awaiting_port = PortIdLocal::new_invalid(); branch.prepared = PreparedStatement::PerformedGet(message.content.as_message().unwrap().clone()); + println!("DEBUG: ### Branching due to BlockGet with existing message"); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; @@ -360,7 +378,8 @@ impl ConnectorPDL { &pd.modules, &pd.heap, String::from("attempted to 'put' on port that is no longer owned") ); - self.mode = Mode::SyncError(eval_error); + self.eval_error = Some(eval_error); + self.mode = Mode::SyncError; } branch.prepared = PreparedStatement::PerformedPut; @@ -399,7 +418,7 @@ impl ConnectorPDL { match run_result { EvalContinuation::ComponentTerminated => { branch.sync_state = SpeculativeState::Finished; - + println!("DEBUG: ************ DOING THEM EXITS"); return ConnectorScheduling::Exit; }, EvalContinuation::SyncBlockStart => { @@ -409,6 +428,7 @@ impl ConnectorPDL { self.consensus.start_sync(comp_ctx); self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); + self.mode = Mode::Sync; return ConnectorScheduling::Immediate; }, @@ -449,38 +469,43 @@ impl ConnectorPDL { /// Helper that moves the component's state back into non-sync mode, using /// the provided solution branch ID as the branch that should be comitted to - /// memory. - fn enter_non_sync_mode(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtx) { + /// memory. If this function returns false, then the component is supposed + /// to exit. + fn enter_non_sync_mode(&mut self, conclusion: RoundConclusion, ctx: &mut ComponentCtx) -> ConnectorScheduling { debug_assert!(self.mode == Mode::Sync || self.mode == Mode::SyncError); - let mut fake_vec = Vec::new(); - self.tree.end_sync(solution_branch_id); - self.consensus.end_sync(solution_branch_id, &mut fake_vec); - - for port in fake_vec { - // TODO: Handle sent/received ports - debug_assert!(ctx.get_port_by_id(port).is_some()); - } - - ctx.notify_sync_end(&[]); - self.last_finished_handled = None; - self.eval_error = None; // in case we came from the SyncError mode - self.mode = Mode::NonSync; - } - - /// Helper that moves the component's state into sync-error mode, sending - /// the appropriate errors where needed. The branch that caused the fatal - /// error and the evaluation error should be provided. - fn enter_sync_error_mode(&mut self, failing_branch_id: BranchId, ctx: &mut ComponentCtx, eval_error: EvalError) { - debug_assert!(self.mode == Mode::Sync); - debug_assert!(self.eval_error.is_none()); - self.mode = Mode::SyncError; - self.eval_error = Some(eval_error); + // Depending on local state decide what to do + let final_branch_id = match conclusion { + RoundConclusion::Success(branch_id) => Some(branch_id), + RoundConclusion::Failure => if self.mode == Mode::SyncError { + // We experienced an error, so exit now + None + } else { + // We didn't experience an error, so retry + // TODO: Decide what to do with sync errors + Some(BranchId::new_invalid()) + } + }; - let failing_branch = &mut self.tree[failing_branch_id]; - failing_branch.sync_state = SpeculativeState::Error; + if let Some(solution_branch_id) = final_branch_id { + let mut fake_vec = Vec::new(); + self.tree.end_sync(solution_branch_id); + self.consensus.end_sync(solution_branch_id, &mut fake_vec); + debug_assert!(fake_vec.is_empty()); + ctx.notify_sync_end(&[]); + self.last_finished_handled = None; + self.eval_error = None; // in case we came from the SyncError mode + self.mode = Mode::NonSync; + return ConnectorScheduling::Immediate; + } else { + // No final branch, because we're supposed to exit! + panic!("TEMPTEMP: NOOOOOOOOO 1"); + self.last_finished_handled = None; + self.mode = Mode::Error; + return ConnectorScheduling::Exit; + } } /// Runs the prompt repeatedly until some kind of execution-blocking diff --git a/src/runtime2/consensus.rs b/src/runtime2/consensus.rs index 951045671b8adc3c3eb5901f27538ad6d3c881a7..e0ae0d1004a4669e57bab2fcaff4615debac4a99 100644 --- a/src/runtime2/consensus.rs +++ b/src/runtime2/consensus.rs @@ -2,19 +2,20 @@ use crate::collections::VecSet; use crate::protocol::eval::ValueGroup; use crate::runtime2::inbox::BranchMarker; +use crate::runtime2::scheduler::ComponentPortChange; use super::ConnectorId; use super::branch::BranchId; use super::port::{ChannelId, PortIdLocal}; use super::inbox::{ - Message, PortAnnotation, + Message, ChannelAnnotation, DataMessage, DataContent, DataHeader, - SyncMessage, SyncContent, SyncHeader, + SyncCompMessage, SyncCompContent, SyncPortMessage, SyncPortContent, SyncHeader, }; use super::scheduler::ComponentCtx; struct BranchAnnotation { - port_mapping: Vec, + channel_mapping: Vec, cur_marker: BranchMarker, } @@ -31,6 +32,12 @@ pub(crate) struct GlobalSolution { channel_mapping: Vec<(ChannelId, BranchMarker)>, // TODO: This can go, is debugging info } +#[derive(Debug, PartialEq, Eq)] +pub enum RoundConclusion { + Failure, + Success(BranchId), +} + // ----------------------------------------------------------------------------- // Consensus // ----------------------------------------------------------------------------- @@ -61,6 +68,9 @@ pub(crate) struct Consensus { // Gathered state from communication encountered_ports: VecSet, // to determine if we should send "port remains silent" messages. solution_combiner: SolutionCombiner, + handled_wave: bool, // encountered notification wave in this round + conclusion: Option, + ack_remaining: u32, // --- Persistent state peers: Vec, sync_round: u32, @@ -82,6 +92,9 @@ impl Consensus { branch_markers: Vec::new(), encountered_ports: VecSet::new(), solution_combiner: SolutionCombiner::new(), + handled_wave: false, + conclusion: None, + ack_remaining: 0, peers: Vec::new(), sync_round: 0, workspace_ports: Vec::new(), @@ -96,9 +109,10 @@ impl Consensus { } /// TODO: Remove this once multi-fire is in place - pub fn get_annotation(&self, branch_id: BranchId, port_id: PortIdLocal) -> &PortAnnotation { + #[deprecated] + pub fn get_annotation(&self, branch_id: BranchId, channel_id: PortIdLocal) -> &ChannelAnnotation { let branch = &self.branch_annotations[branch_id.index as usize]; - let port = branch.port_mapping.iter().find(|v| v.port_id == port_id).unwrap(); + let port = branch.channel_mapping.iter().find(|v| v.channel_id.index == channel_id.index).unwrap(); return port; } @@ -113,9 +127,9 @@ impl Consensus { // We'll use the first "branch" (the non-sync one) to store our ports, // this allows cloning if we created a new branch. self.branch_annotations.push(BranchAnnotation{ - port_mapping: ctx.get_ports().iter() - .map(|v| PortAnnotation{ - port_id: v.self_id, + channel_mapping: ctx.get_ports().iter() + .map(|v| ChannelAnnotation { + channel_id: v.channel_id, registered_id: None, expected_firing: None, }) @@ -133,11 +147,12 @@ impl Consensus { pub fn notify_of_new_branch(&mut self, parent_branch_id: BranchId, new_branch_id: BranchId) { // If called correctly. Then each time we are notified the new branch's // index is the length in `branch_annotations`. + println!("DEBUG: Branch {} became forked into {}", parent_branch_id.index, new_branch_id.index); debug_assert!(self.branch_annotations.len() == new_branch_id.index as usize); let parent_branch_annotations = &self.branch_annotations[parent_branch_id.index as usize]; let new_marker = BranchMarker::new(self.branch_markers.len() as u32); let new_branch_annotations = BranchAnnotation{ - port_mapping: parent_branch_annotations.port_mapping.clone(), + channel_mapping: parent_branch_annotations.channel_mapping.clone(), cur_marker: new_marker, }; self.branch_annotations.push(new_branch_annotations); @@ -145,22 +160,37 @@ impl Consensus { } /// Notifies the consensus algorithm that a particular branch has - /// encountered an unrecoverable error. If the return value is `false`, then - /// the caller can enter a "normal" exit mode instead of the special "sync" - /// exit mode. - pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId) -> bool { + /// encountered an unrecoverable error. + pub fn notify_of_fatal_branch(&mut self, failed_branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { debug_assert!(self.is_in_sync()); // Check for trivial case, where branch has not yet communicated within // the consensus algorithm let branch = &self.branch_annotations[failed_branch_id.index as usize]; - if branch.port_mapping.iter().all(|v| v.registered_id.is_none()) { - return false; + if branch.channel_mapping.iter().all(|v| v.registered_id.is_none()) { + return Some(RoundConclusion::Failure); } - // Branch has communicated. Since we need to discover the entire + // We need to go through the hassle of notifying all participants in the + // sync round that we've encountered an error. + // --- notify leader + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalFailure, ctx); + + // --- initiate discovery wave (to let leader know about all components) + self.handled_wave = true; + for mapping in &self.branch_annotations[0].channel_mapping { + let channel_id = mapping.channel_id; + let port_info = ctx.get_port_by_channel_id(channel_id).unwrap(); + let message = SyncPortMessage{ + sync_header: self.create_sync_header(ctx), + source_port: port_info.self_id, + target_port: port_info.peer_id, + content: SyncPortContent::NotificationWave, + }; + ctx.submit_message(Message::SyncPort(message)); + } - return true; + return maybe_conclusion; } /// Notifies the consensus algorithm that a branch has reached the end of @@ -169,7 +199,7 @@ impl Consensus { pub fn notify_of_finished_branch(&self, branch_id: BranchId) -> Consistency { debug_assert!(self.is_in_sync()); let branch = &self.branch_annotations[branch_id.index as usize]; - for mapping in &branch.port_mapping { + for mapping in &branch.channel_mapping { match mapping.expected_firing { Some(expected) => { if expected != mapping.registered_id.is_some() { @@ -187,11 +217,14 @@ impl Consensus { /// Notifies the consensus algorithm that a particular branch has assumed /// a speculative value for its port mapping. - pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool) -> Consistency { + pub fn notify_of_speculative_mapping(&mut self, branch_id: BranchId, port_id: PortIdLocal, does_fire: bool, ctx: &ComponentCtx) -> Consistency { debug_assert!(self.is_in_sync()); + + let port_desc = ctx.get_port_by_id(port_id).unwrap(); + let channel_id = port_desc.channel_id; let branch = &mut self.branch_annotations[branch_id.index as usize]; - for mapping in &mut branch.port_mapping { - if mapping.port_id == port_id { + for mapping in &mut branch.channel_mapping { + if mapping.channel_id == channel_id { match mapping.expected_firing { None => { // Not yet mapped, perform speculative mapping @@ -218,20 +251,21 @@ impl Consensus { /// appropriate component. If it is the leader then there is a chance that /// this solution completes a global solution. In that case the solution /// branch ID will be returned. - pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { + pub(crate) fn handle_new_finished_sync_branch(&mut self, branch_id: BranchId, ctx: &mut ComponentCtx) -> Option { // Turn the port mapping into a local solution - let source_mapping = &self.branch_annotations[branch_id.index as usize].port_mapping; + let source_mapping = &self.branch_annotations[branch_id.index as usize].channel_mapping; let mut target_mapping = Vec::with_capacity(source_mapping.len()); for port in source_mapping { // Note: if the port is silent, and we've never communicated // over the port, then we need to do so now, to let the peer // component know about our sync leader state. - let port_desc = ctx.get_port_by_id(port.port_id).unwrap(); + let port_desc = ctx.get_port_by_channel_id(port.channel_id).unwrap(); + let self_port_id = port_desc.self_id; let peer_port_id = port_desc.peer_id; let channel_id = port_desc.channel_id; - if !self.encountered_ports.contains(&port.port_id) { + if !self.encountered_ports.contains(&self_port_id) { ctx.submit_message(Message::Data(DataMessage { sync_header: SyncHeader{ sending_component_id: ctx.id, @@ -240,13 +274,13 @@ impl Consensus { }, data_header: DataHeader{ expected_mapping: source_mapping.clone(), - sending_port: port.port_id, + sending_port: self_port_id, target_port: peer_port_id, new_mapping: BranchMarker::new_invalid(), }, content: DataContent::SilentPortNotification, })); - self.encountered_ports.push(port.port_id); + self.encountered_ports.push(self_port_id); } target_mapping.push(( @@ -260,29 +294,30 @@ impl Consensus { final_branch_id: branch_id, port_mapping: target_mapping, }; - let solution_branch = self.send_or_store_local_solution(local_solution, ctx); - return solution_branch; + let maybe_conclusion = self.send_to_leader_or_handle_as_leader(SyncCompContent::LocalSolution(local_solution), ctx); + return maybe_conclusion; } /// Notifies the consensus algorithm about the chosen branch to commit to - /// memory. - pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { + /// memory (may be the invalid "start" branch) + pub fn end_sync(&mut self, branch_id: BranchId, final_ports: &mut Vec) { debug_assert!(self.is_in_sync()); // TODO: Handle sending and receiving ports // Set final ports - final_ports.clear(); let branch = &self.branch_annotations[branch_id.index as usize]; - for port in &branch.port_mapping { - final_ports.push(port.port_id); - } // Clear out internal storage to defaults self.highest_connector_id = ConnectorId::new_invalid(); self.branch_annotations.clear(); + self.branch_markers.clear(); self.encountered_ports.clear(); self.solution_combiner.clear(); + self.handled_wave = false; + self.conclusion = None; + self.ack_remaining = 0; + // And modify persistent storage self.sync_round += 1; for peer in self.peers.iter_mut() { @@ -298,11 +333,12 @@ impl Consensus { pub fn handle_message_to_send(&mut self, branch_id: BranchId, source_port_id: PortIdLocal, content: &ValueGroup, ctx: &mut ComponentCtx) -> (SyncHeader, DataHeader) { debug_assert!(self.is_in_sync()); let branch = &mut self.branch_annotations[branch_id.index as usize]; + let port_info = ctx.get_port_by_id(source_port_id).unwrap(); if cfg!(debug_assertions) { // Check for consistent mapping - let port = branch.port_mapping.iter() - .find(|v| v.port_id == source_port_id) + let port = branch.channel_mapping.iter() + .find(|v| v.channel_id == port_info.channel_id) .unwrap(); debug_assert!(port.expected_firing == None || port.expected_firing == Some(true)); } @@ -318,17 +354,16 @@ impl Consensus { // Construct data header // TODO: Handle multiple firings. Right now we just assign the current // branch to the `None` value because we know we can only send once. - let port_info = ctx.get_port_by_id(source_port_id).unwrap(); let data_header = DataHeader{ - expected_mapping: branch.port_mapping.clone(), + expected_mapping: branch.channel_mapping.clone(), sending_port: port_info.self_id, target_port: port_info.peer_id, new_mapping: branch.cur_marker, }; // Update port mapping - for mapping in &mut branch.port_mapping { - if mapping.port_id == source_port_id { + for mapping in &mut branch.channel_mapping { + if mapping.channel_id == port_info.channel_id { mapping.expected_firing = Some(true); mapping.registered_id = Some(branch.cur_marker); } @@ -348,45 +383,102 @@ impl Consensus { /// responsible for checking for branches that might be able to receive /// the message. pub fn handle_new_data_message(&mut self, message: &DataMessage, ctx: &mut ComponentCtx) -> bool { - return self.handle_received_sync_header(&message.sync_header, ctx) + let handled = self.handle_received_sync_header(&message.sync_header, ctx); + if handled { + self.encountered_ports.push(message.data_header.target_port); + } + return handled; } /// Handles a new sync message by handling the sync header and the contents /// of the message. Returns `Some` with the branch ID of the global solution /// if the sync solution has been found. - pub fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) -> Option { + pub fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) -> Option { if !self.handle_received_sync_header(&message.sync_header, ctx) { return None; } // And handle the contents debug_assert_eq!(message.target_component_id, ctx.id); - match message.content { - SyncContent::Notification => { - // We were just interested in the header - return None; - }, - SyncContent::LocalSolution(solution) => { - // We might be the leader, or earlier messages caused us to not - // be the leader anymore. - return self.send_or_store_local_solution(solution, ctx); + + match &message.content { + SyncCompContent::LocalFailure | + SyncCompContent::LocalSolution(_) | + SyncCompContent::PartialSolution(_) | + SyncCompContent::AckFailure | + SyncCompContent::Presence(_, _) => { + // Needs to be handled by the leader + return self.send_to_leader_or_handle_as_leader(message.content, ctx); }, - SyncContent::GlobalSolution(solution) => { - // Take branch of interest and return it. + SyncCompContent::GlobalSolution(solution) => { + // Found a global solution + debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader let (_, branch_id) = solution.component_branches.iter() - .find(|(connector_id, _)| *connector_id == ctx.id) + .find(|(component_id, _)| *component_id == ctx.id) .unwrap(); - return Some(*branch_id); + return Some(RoundConclusion::Success(*branch_id)); + }, + SyncCompContent::GlobalFailure => { + // Global failure of round, send Ack to leader + debug_assert_ne!(self.highest_connector_id, ctx.id); // not the leader + let _result = self.send_to_leader_or_handle_as_leader(SyncCompContent::AckFailure, ctx); + debug_assert!(_result.is_none()); + return Some(RoundConclusion::Failure); + }, + SyncCompContent::Notification => { + // We were just interested in the sync header we handled above + return None; + } + } + } + + pub fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { + if !self.handle_received_sync_header(&message.sync_header, ctx) { + return; + } + + debug_assert!(self.is_in_sync()); + debug_assert!(ctx.get_port_by_id(message.target_port).is_some()); + match message.content { + SyncPortContent::NotificationWave => { + // Wave to discover everyone in the network, handling sync + // header takes care of leader discovery, here we need to make + // sure we propagate the wave + if self.handled_wave { + return; + } + + self.handled_wave = true; + + // Propagate wave to all peers except the one that has sent us + // the wave. + for mapping in &self.branch_annotations[0].channel_mapping { + let channel_id = mapping.channel_id; + let port_desc = ctx.get_port_by_channel_id(channel_id).unwrap(); + if port_desc.self_id == message.target_port { + // Wave came from this port, no need to send one back + continue; + } + + let message = SyncPortMessage{ + sync_header: self.create_sync_header(ctx), + source_port: port_desc.self_id, + target_port: port_desc.peer_id, + content: SyncPortContent::NotificationWave, + }; + ctx.submit_message(Message::SyncPort(message)).unwrap(); + } } } } - pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage) { + pub fn notify_of_received_message(&mut self, branch_id: BranchId, message: &DataMessage, ctx: &ComponentCtx) { debug_assert!(self.branch_can_receive(branch_id, message)); + let target_port = ctx.get_port_by_id(message.data_header.target_port).unwrap(); let branch = &mut self.branch_annotations[branch_id.index as usize]; - for mapping in &mut branch.port_mapping { - if mapping.port_id == message.data_header.target_port { + for mapping in &mut branch.channel_mapping { + if mapping.channel_id == target_port.channel_id { // Found the port in which the message should be inserted mapping.registered_id = Some(message.data_header.new_mapping); @@ -425,8 +517,8 @@ impl Consensus { for expected in &message.data_header.expected_mapping { // If we own the port, then we have an entry in the // annotation, check if the current mapping matches - for current in &annotation.port_mapping { - if expected.port_id == current.port_id { + for current in &annotation.channel_mapping { + if expected.channel_id == current.channel_id { if expected.registered_id != current.registered_id { // IDs do not match, we cannot receive the // message in this branch @@ -458,25 +550,25 @@ impl Consensus { continue } - let message = SyncMessage { + let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: peer.id, - content: SyncContent::Notification, + content: SyncCompContent::Notification, }; - ctx.submit_message(Message::Sync(message)); + ctx.submit_message(Message::SyncComp(message)); } // But also send our locally combined solution - self.forward_local_solutions(ctx); + self.forward_local_data_to_new_leader(ctx); } else if sync_header.highest_component_id < self.highest_connector_id { // Sender has lower leader ID, so it should know about our higher // one. - let message = SyncMessage { + let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: sync_header.sending_component_id, - content: SyncContent::Notification + content: SyncCompContent::Notification }; - ctx.submit_message(Message::Sync(message)); + ctx.submit_message(Message::SyncComp(message)); } // else: exactly equal, so do nothing return true; @@ -509,39 +601,120 @@ impl Consensus { } } - fn send_or_store_local_solution(&mut self, solution: LocalSolution, ctx: &mut ComponentCtx) -> Option { + /// Sends a message towards the leader, if already the leader then the + /// message will be handled immediately. + fn send_to_leader_or_handle_as_leader(&mut self, content: SyncCompContent, ctx: &mut ComponentCtx) -> Option { if self.highest_connector_id == ctx.id { // We are the leader - if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(solution) { - let mut my_final_branch_id = BranchId::new_invalid(); - for (connector_id, branch_id) in global_solution.component_branches.iter().copied() { - if connector_id == ctx.id { - // This is our solution branch - my_final_branch_id = branch_id; - continue; + match content { + SyncCompContent::LocalFailure => { + if self.solution_combiner.mark_failure_and_check_for_global_failure() { + return self.handle_global_failure_as_leader(ctx); + } + }, + SyncCompContent::LocalSolution(local_solution) => { + if let Some(global_solution) = self.solution_combiner.add_solution_and_check_for_global_solution(local_solution) { + return self.handle_global_solution_as_leader(global_solution, ctx); + } + }, + SyncCompContent::PartialSolution(partial_solution) => { + if let Some(conclusion) = self.solution_combiner.combine(partial_solution) { + match conclusion { + LeaderConclusion::Solution(global_solution) => { + return self.handle_global_solution_as_leader(global_solution, ctx); + }, + LeaderConclusion::Failure => { + return self.handle_global_failure_as_leader(ctx); + } + } + } + }, + SyncCompContent::Presence(component_id, presence) => { + if self.solution_combiner.add_presence_and_check_for_global_failure(component_id, &presence) { + return self.handle_global_failure_as_leader(ctx); + } + }, + SyncCompContent::AckFailure => { + debug_assert_eq!(Some(RoundConclusion::Failure), self.conclusion); + debug_assert!(self.ack_remaining > 0); + self.ack_remaining -= 1; + if self.ack_remaining == 0 { + return Some(RoundConclusion::Failure); } - - let message = SyncMessage { - sync_header: self.create_sync_header(ctx), - target_component_id: connector_id, - content: SyncContent::GlobalSolution(global_solution.clone()), - }; - ctx.submit_message(Message::Sync(message)); } - - debug_assert!(my_final_branch_id.is_valid()); - return Some(my_final_branch_id); - } else { - return None; + SyncCompContent::Notification | SyncCompContent::GlobalSolution(_) | + SyncCompContent::GlobalFailure => { + unreachable!("unexpected message content for leader"); + }, } } else { // Someone else is the leader - let message = SyncMessage { + let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, - content: SyncContent::LocalSolution(solution), + content, }; - ctx.submit_message(Message::Sync(message)); + ctx.submit_message(Message::SyncComp(message)); + } + + return None; + } + + fn handle_global_solution_as_leader(&mut self, global_solution: GlobalSolution, ctx: &mut ComponentCtx) -> Option { + if self.conclusion.is_some() { + return None; + } + + // Handle the global solution + let mut my_final_branch_id = BranchId::new_invalid(); + for (connector_id, branch_id) in global_solution.component_branches.iter().copied() { + if connector_id == ctx.id { + // This is our solution branch + my_final_branch_id = branch_id; + continue; + } + + let message = SyncCompMessage { + sync_header: self.create_sync_header(ctx), + target_component_id: connector_id, + content: SyncCompContent::GlobalSolution(global_solution.clone()), + }; + ctx.submit_message(Message::SyncComp(message)); + } + + debug_assert!(my_final_branch_id.is_valid()); + self.conclusion = Some(RoundConclusion::Success(my_final_branch_id)); + return Some(RoundConclusion::Success(my_final_branch_id)); + } + + fn handle_global_failure_as_leader(&mut self, ctx: &mut ComponentCtx) -> Option { + debug_assert!(self.solution_combiner.failure_reported && self.solution_combiner.check_for_global_failure()); + if self.conclusion.is_some() { + return None; + } + + // TODO: Performance + let mut encountered = VecSet::new(); + for presence in &self.solution_combiner.presence { + if presence.added_by != ctx.id { + // Did not add it ourselves + if encountered.push(presence.added_by) { + // Not yet sent a message + let message = SyncCompMessage{ + sync_header: self.create_sync_header(ctx), + target_component_id: presence.added_by, + content: SyncCompContent::GlobalFailure, + }; + ctx.submit_message(Message::SyncComp(message)); + } + } + } + + self.conclusion = Some(RoundConclusion::Failure); + if encountered.is_empty() { + // We don't have to wait on Acks + return Some(RoundConclusion::Failure); + } else { return None; } } @@ -555,16 +728,16 @@ impl Consensus { } } - fn forward_local_solutions(&mut self, ctx: &mut ComponentCtx) { + fn forward_local_data_to_new_leader(&mut self, ctx: &mut ComponentCtx) { debug_assert_ne!(self.highest_connector_id, ctx.id); - for local_solution in self.solution_combiner.drain() { - let message = SyncMessage { + if let Some(partial_solution) = self.solution_combiner.drain() { + let message = SyncCompMessage { sync_header: self.create_sync_header(ctx), target_component_id: self.highest_connector_id, - content: SyncContent::LocalSolution(local_solution), + content: SyncCompContent::PartialSolution(partial_solution), }; - ctx.submit_message(Message::Sync(message)); + ctx.submit_message(Message::SyncComp(message)); } } } @@ -575,28 +748,28 @@ impl Consensus { // TODO: Remove all debug derives -#[derive(Debug)] +#[derive(Debug, Clone)] struct MatchedLocalSolution { final_branch_id: BranchId, channel_mapping: Vec<(ChannelId, BranchMarker)>, matches: Vec, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ComponentMatches { target_id: ConnectorId, target_index: usize, match_indices: Vec, // of local solution in connector } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ComponentPeer { target_id: ConnectorId, target_index: usize, // in array of global solution components involved_channels: Vec, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ComponentLocalSolutions { component: ConnectorId, peers: Vec, @@ -604,9 +777,19 @@ struct ComponentLocalSolutions { all_peers_present: bool, } +#[derive(Debug, Clone)] +struct ChannelPresence { + added_by: ConnectorId, + channel: ChannelId, + both_sides_present: bool, +} + // TODO: Flatten? Flatten. Flatten everything. +#[derive(Debug)] pub(crate) struct SolutionCombiner { - local: Vec + local: Vec, // used for finding solution + presence: Vec, // used to detect all channels present in case of failure + failure_reported: bool, } struct CheckEntry { @@ -617,10 +800,17 @@ struct CheckEntry { solution_index_in_parent: usize,// index in the solution array of the match entry in the parent } +enum LeaderConclusion { + Solution(GlobalSolution), + Failure, +} + impl SolutionCombiner { fn new() -> Self { return Self{ local: Vec::new(), + presence: Vec::new(), + failure_reported: false, }; } @@ -819,15 +1009,47 @@ impl SolutionCombiner { } } - return self.check_new_solution(component_index, solution_index); + return self.check_for_global_solution(component_index, solution_index); + } + + fn add_presence_and_check_for_global_failure(&mut self, present_component: ConnectorId, present: &[ChannelId]) -> bool { + 'new_channel_loop: for new_channel_id in present { + for presence in &mut self.presence { + if presence.channel == *new_channel_id { + if presence.added_by != present_component { + presence.both_sides_present = true; + } + + continue 'new_channel_loop; + } + } + + // Not added yet + self.presence.push(ChannelPresence{ + added_by: present_component, + channel: *new_channel_id, + both_sides_present: false, + }); + } + + return self.check_for_global_failure(); + } + + fn mark_failure_and_check_for_global_failure(&mut self) -> bool { + self.failure_reported = true; + return self.check_for_global_failure(); } /// Checks if, starting at the provided local solution, a global solution /// can be formed. // TODO: At some point, check if divide and conquer is faster? - fn check_new_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option { - if !self.can_have_solution() { - return None; + fn check_for_global_solution(&self, initial_component_index: usize, initial_solution_index: usize) -> Option { + // Small trivial test necessary (but not sufficient) for a global + // solution + for component in &self.local { + if !component.all_peers_present { + return None; + } } // Construct initial entry on stack @@ -989,44 +1211,91 @@ impl SolutionCombiner { }); } - /// Simple test if a solution is at all possible. If this returns true it - /// does not mean there actually is a solution. - fn can_have_solution(&self) -> bool { - for component in &self.local { - if !component.all_peers_present { - return false; + /// Checks if all preconditions for global sync failure have been met + fn check_for_global_failure(&self) -> bool { + if !self.failure_reported { + return false; + } + + // Failure is reported, if all components are present then we may emit + // the global failure broadcast + // Check if all are present and we're preparing to fail this round + let mut all_present = true; + for presence in &self.presence { + if !presence.both_sides_present { + all_present = false; + break; } } - return true; + return all_present; // && failure_reported, which is checked above } - /// Turns the entire (partially resolved) global solution back into local - /// solutions to ship to another component. - // TODO: Don't do this, kind of wasteful since a lot of processing has - // already been performed. - fn drain(&mut self) -> Vec { - let mut reserve_len = 0; - for component in &self.local { - reserve_len += component.solutions.len(); + /// Turns the entire (partially resolved) global solution into a structure + /// that can be forwarded to a new parent. The new parent may then merge + /// already obtained information. + fn drain(&mut self) -> Option { + if self.local.is_empty() && self.presence.is_empty() && !self.failure_reported { + return None; } - let mut solutions = Vec::with_capacity(reserve_len); - for component in self.local.drain(..) { - for solution in component.solutions { - solutions.push(LocalSolution{ - component: component.component, - final_branch_id: solution.final_branch_id, - port_mapping: solution.channel_mapping, - }); + let result = SolutionCombiner{ + local: self.local.clone(), + presence: self.presence.clone(), + failure_reported: self.failure_reported, + }; + + self.local.clear(); + self.presence.clear(); + self.failure_reported = false; + return Some(result); + } + + // TODO: Entire routine is quite wasteful. Combine instead of doing all work + // again. + fn combine(&mut self, combiner: SolutionCombiner) -> Option { + self.failure_reported = self.failure_reported || combiner.failure_reported; + + // Handle local solutions + if self.local.is_empty() { + // Trivial case + self.local = combiner.local; + } else { + for local in combiner.local { + for matched in local.solutions { + let local_solution = LocalSolution{ + component: local.component, + final_branch_id: matched.final_branch_id, + port_mapping: matched.channel_mapping, + }; + let maybe_solution = self.add_solution_and_check_for_global_solution(local_solution); + if let Some(global_solution) = maybe_solution { + return Some(LeaderConclusion::Solution(global_solution)); + } + } + } + } + + // Handle channel presence + if self.presence.is_empty() { + // Trivial case + self.presence = combiner.presence + } else { + for presence in combiner.presence { + let global_failure = self.add_presence_and_check_for_global_failure(presence.added_by, &[presence.channel]); + if global_failure { + return Some(LeaderConclusion::Failure); + } } } - return solutions; + return None; } fn clear(&mut self) { self.local.clear(); + self.presence.clear(); + self.failure_reported = false; } } diff --git a/src/runtime2/inbox.rs b/src/runtime2/inbox.rs index 26983cc5a008de576fe4860a3c1e1b0fbf189777..c489561f96788af0e87b555f3c16628822032599 100644 --- a/src/runtime2/inbox.rs +++ b/src/runtime2/inbox.rs @@ -2,6 +2,8 @@ use std::sync::Mutex; use std::collections::VecDeque; use crate::protocol::eval::ValueGroup; +use crate::runtime2::consensus::SolutionCombiner; +use crate::runtime2::port::ChannelId; use super::ConnectorId; use super::branch::BranchId; @@ -11,8 +13,8 @@ use super::port::PortIdLocal; // TODO: Remove Debug derive from all types #[derive(Debug, Copy, Clone)] -pub(crate) struct PortAnnotation { - pub port_id: PortIdLocal, +pub(crate) struct ChannelAnnotation { + pub channel_id: ChannelId, pub registered_id: Option, pub expected_firing: Option, } @@ -50,7 +52,7 @@ pub(crate) struct SyncHeader { /// The header added to data messages #[derive(Debug, Clone)] pub(crate) struct DataHeader { - pub expected_mapping: Vec, + pub expected_mapping: Vec, pub sending_port: PortIdLocal, pub target_port: PortIdLocal, pub new_mapping: BranchMarker, @@ -87,19 +89,37 @@ pub(crate) struct DataMessage { } #[derive(Debug)] -pub(crate) enum SyncContent { +pub(crate) enum SyncCompContent { + LocalFailure, // notifying leader that component has failed (e.g. timeout, whatever) LocalSolution(LocalSolution), // sending a local solution to the leader + PartialSolution(SolutionCombiner), // when new leader is detected, forward all local results GlobalSolution(GlobalSolution), // broadcasting to everyone + GlobalFailure, // broadcasting to everyone + AckFailure, // acknowledgement of failure to leader Notification, // just a notification (so purpose of message is to send the SyncHeader) + Presence(ConnectorId, Vec), // notifying leader of component presence (needed to ensure failing a round involves all components in a sync round) } /// A sync message is a message that is intended only for the consensus -/// algorithm. +/// algorithm. The message goes directly to a component. #[derive(Debug)] -pub(crate) struct SyncMessage { +pub(crate) struct SyncCompMessage { pub sync_header: SyncHeader, pub target_component_id: ConnectorId, - pub content: SyncContent, + pub content: SyncCompContent, +} + +#[derive(Debug)] +pub(crate) enum SyncPortContent { + NotificationWave, +} + +#[derive(Debug)] +pub(crate) struct SyncPortMessage { + pub sync_header: SyncHeader, + pub source_port: PortIdLocal, + pub target_port: PortIdLocal, + pub content: SyncPortContent, } /// A control message is a message intended for the scheduler that is executing @@ -123,7 +143,8 @@ pub(crate) enum ControlContent { #[derive(Debug)] pub(crate) enum Message { Data(DataMessage), - Sync(SyncMessage), + SyncComp(SyncCompMessage), + SyncPort(SyncPortMessage), Control(ControlMessage), } diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index d3a3d9b736dae482f218da5772436b1c3c7ad8e4..92670a262990d897136c2a1317979d3d1848f258 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; +use crate::runtime2::consensus::RoundConclusion; use super::{ConnectorKey, ConnectorId, RuntimeInner}; use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; @@ -12,7 +13,11 @@ use super::scheduler::{SchedulerCtx, ComponentCtx}; use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::connector::{ConnectorScheduling, ConnectorPDL}; -use super::inbox::{Message, DataContent, DataMessage, SyncMessage, ControlContent, ControlMessage}; +use super::inbox::{ + Message, DataContent, DataMessage, + SyncCompMessage, SyncPortMessage, + ControlContent, ControlMessage +}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -25,6 +30,7 @@ pub(crate) trait Connector { pub(crate) struct FinishedSync { // In the order of the `get` calls + success: bool, inbox: Vec, } @@ -70,9 +76,9 @@ impl Connector for ConnectorApplication { iter_id = self.tree.get_queue_next(branch_id); self.last_finished_handled = Some(branch_id); - if let Some(solution_branch) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + if let Some(conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { // Can finish sync round immediately - self.collapse_sync_to_solution_branch(solution_branch, comp_ctx); + self.collapse_sync_to_conclusion(conclusion, comp_ctx); return ConnectorScheduling::Immediate; } } @@ -108,7 +114,8 @@ impl ConnectorApplication { while let Some(message) = comp_ctx.read_next_message() { match message { Message::Data(message) => self.handle_new_data_message(message, comp_ctx), - Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx), + Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx), + Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx), Message::Control(_) => unreachable!("control message in native API component"), } } @@ -138,19 +145,23 @@ impl ConnectorApplication { let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } - pub(crate) fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { - if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, ctx); + pub(crate) fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) { + if let Some(conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) { + self.collapse_sync_to_conclusion(conclusion, ctx); } } + pub(crate) fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { + self.consensus.handle_new_sync_port_message(message, ctx); + } + fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { debug_assert!(self.is_in_sync); @@ -214,7 +225,7 @@ impl ConnectorApplication { branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; @@ -287,16 +298,21 @@ impl ConnectorApplication { return ConnectorScheduling::NotNow; } - fn collapse_sync_to_solution_branch(&mut self, branch_id: BranchId, comp_ctx: &mut ComponentCtx) { - debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program + fn collapse_sync_to_conclusion(&mut self, conclusion: RoundConclusion, comp_ctx: &mut ComponentCtx) { // Notifying tree, consensus algorithm and context of ending sync let mut fake_vec = Vec::new(); + + let (branch_id, success) = match conclusion { + RoundConclusion::Success(branch_id) => { + debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program provided by API + (branch_id, true) + }, + RoundConclusion::Failure => (BranchId::new_invalid(), false), + }; + let mut solution_branch = self.tree.end_sync(branch_id); self.consensus.end_sync(branch_id, &mut fake_vec); - - for port in fake_vec { - debug_assert!(comp_ctx.get_port_by_id(port).is_some()); - } + debug_assert!(fake_vec.is_empty()); comp_ctx.notify_sync_end(&[]); @@ -320,7 +336,7 @@ impl ConnectorApplication { let (results, notification) = &*self.sync_done; let mut results = results.lock().unwrap(); - *results = Some(FinishedSync{ inbox }); + *results = Some(FinishedSync{ success, inbox }); notification.notify_one(); } } @@ -345,6 +361,7 @@ pub enum ApplicationStartSyncError { #[derive(Debug)] pub enum ApplicationEndSyncError { NotInSync, + Failure, } pub enum ApplicationSyncAction { @@ -496,7 +513,12 @@ impl ApplicationInterface { lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done self.is_in_sync = false; - return Ok(lock.take().unwrap().inbox); + let result = lock.take().unwrap(); + if result.success { + return Ok(result.inbox); + } else { + return Err(ApplicationEndSyncError::Failure); + } } /// Called by runtime to set associated connector's ID. diff --git a/src/runtime2/scheduler.rs b/src/runtime2/scheduler.rs index e822bdff686170be618cdaab571777e698bd80b6..911674012bf95991b546a585dc61ef9570afbc16 100644 --- a/src/runtime2/scheduler.rs +++ b/src/runtime2/scheduler.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::sync::atomic::Ordering; use crate::protocol::eval::EvalError; +use crate::runtime2::port::ChannelId; use super::{ScheduledConnector, RuntimeInner, ConnectorId, ConnectorKey}; use super::port::{Port, PortState, PortIdLocal}; @@ -134,7 +135,7 @@ impl Scheduler { while let Some(message) = scheduled.public.inbox.take_message() { // Check if the message has to be rerouted because we have moved the // target port to another component. - self.debug_conn(connector_id, &format!("Handling message\n --- {:?}", message)); + self.debug_conn(connector_id, &format!("Handling message\n --- {:#?}", message)); if let Some(target_port) = Self::get_message_target_port(&message) { if let Some(other_component_id) = scheduled.router.should_reroute(target_port) { self.debug_conn(connector_id, " ... Rerouting the message"); @@ -206,7 +207,7 @@ impl Scheduler { // Handling any messages that were sent while let Some(message) = scheduled.ctx.outbox.pop_front() { - self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:?}", message)); + self.debug_conn(connector_id, &format!("Sending message [outbox] \n --- {:#?}", message)); let target_component_id = match &message { Message::Data(content) => { @@ -221,13 +222,22 @@ impl Scheduler { port_desc.peer_connector }, - Message::Sync(content) => { + Message::SyncComp(content) => { // Sync messages are always sent to a particular component, // the sender must make sure it actually wants to send to // the specified component (and is not using an inconsistent // component ID associated with a port). content.target_component_id }, + Message::SyncPort(content) => { + let port_desc = scheduled.ctx.get_port_by_id(content.source_port).unwrap(); + debug_assert_eq!(port_desc.peer_id, content.target_port); + if port_desc.state == PortState::Closed { + todo!("handle sending over a closed port") + } + + port_desc.peer_connector + } Message::Control(_) => { unreachable!("component sending control messages directly"); } @@ -339,7 +349,8 @@ impl Scheduler { fn get_message_target_port(message: &Message) -> Option { match message { Message::Data(data) => return Some(data.data_header.target_port), - Message::Sync(_) => {}, + Message::SyncComp(_) => {}, + Message::SyncPort(content) => return Some(content.target_port), Message::Control(control) => { match &control.content { ControlContent::PortPeerChanged(port_id, _) => return Some(*port_id), @@ -354,11 +365,11 @@ impl Scheduler { // TODO: Remove, this is debugging stuff fn debug(&self, message: &str) { - // println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); + println!("DEBUG [thrd:{:02} conn: ]: {}", self.scheduler_id, message); } fn debug_conn(&self, conn: ConnectorId, message: &str) { - // println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); + println!("DEBUG [thrd:{:02} conn:{:02}]: {}", self.scheduler_id, conn.0, message); } } @@ -450,6 +461,10 @@ impl ComponentCtx { return self.ports.iter().find(|v| v.self_id == id); } + pub(crate) fn get_port_by_channel_id(&self, id: ChannelId) -> Option<&Port> { + return self.ports.iter().find(|v| v.channel_id == id); + } + fn get_port_mut_by_id(&mut self, id: PortIdLocal) -> Option<&mut Port> { return self.ports.iter_mut().find(|v| v.self_id == id); } @@ -528,10 +543,14 @@ impl ComponentCtx { self.inbox_len_read += 1; return Some(Message::Data(content.clone())); }, - Message::Sync(_) => { + Message::SyncComp(_) => { let message = self.inbox_messages.remove(self.inbox_len_read); return Some(message); }, + Message::SyncPort(_) => { + let message = self.inbox_messages.remove(self.inbox_len_read); + return Some(message); + } Message::Control(_) => unreachable!("control message ended up in component inbox"), } } diff --git a/src/runtime2/tests/mod.rs b/src/runtime2/tests/mod.rs index 3c2ce3c1a3b75f637a20b7534adf698a2395cefd..46cf663a10725b3578b54782bfb256bb79b8cc93 100644 --- a/src/runtime2/tests/mod.rs +++ b/src/runtime2/tests/mod.rs @@ -10,9 +10,14 @@ use crate::protocol::eval::*; use crate::runtime2::native::{ApplicationSyncAction}; // Generic testing constants, use when appropriate to simplify stress-testing -pub(crate) const NUM_THREADS: u32 = 3; // number of threads in runtime -pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed -pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) +// pub(crate) const NUM_THREADS: u32 = 3; // number of threads in runtime +// pub(crate) const NUM_INSTANCES: u32 = 7; // number of test instances constructed +// pub(crate) const NUM_LOOPS: u32 = 8; // number of loops within a single test (not used by all tests) + +pub(crate) const NUM_THREADS: u32 = 1; +pub(crate) const NUM_INSTANCES: u32 = 1; +pub(crate) const NUM_LOOPS: u32 = 1; + fn create_runtime(pdl: &str) -> Runtime { let protocol = ProtocolDescription::parse(pdl.as_bytes()).expect("parse pdl");