diff --git a/src/runtime2/native.rs b/src/runtime2/native.rs index d3a3d9b736dae482f218da5772436b1c3c7ad8e4..92670a262990d897136c2a1317979d3d1848f258 100644 --- a/src/runtime2/native.rs +++ b/src/runtime2/native.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use crate::protocol::ComponentCreationError; use crate::protocol::eval::ValueGroup; +use crate::runtime2::consensus::RoundConclusion; use super::{ConnectorKey, ConnectorId, RuntimeInner}; use super::branch::{BranchId, FakeTree, QueueKind, SpeculativeState}; @@ -12,7 +13,11 @@ use super::scheduler::{SchedulerCtx, ComponentCtx}; use super::port::{Port, PortIdLocal, Channel, PortKind}; use super::consensus::{Consensus, Consistency, find_ports_in_value_group}; use super::connector::{ConnectorScheduling, ConnectorPDL}; -use super::inbox::{Message, DataContent, DataMessage, SyncMessage, ControlContent, ControlMessage}; +use super::inbox::{ + Message, DataContent, DataMessage, + SyncCompMessage, SyncPortMessage, + ControlContent, ControlMessage +}; /// Generic connector interface from the scheduler's point of view. pub(crate) trait Connector { @@ -25,6 +30,7 @@ pub(crate) trait Connector { pub(crate) struct FinishedSync { // In the order of the `get` calls + success: bool, inbox: Vec, } @@ -70,9 +76,9 @@ impl Connector for ConnectorApplication { iter_id = self.tree.get_queue_next(branch_id); self.last_finished_handled = Some(branch_id); - if let Some(solution_branch) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + if let Some(conclusion) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { // Can finish sync round immediately - self.collapse_sync_to_solution_branch(solution_branch, comp_ctx); + self.collapse_sync_to_conclusion(conclusion, comp_ctx); return ConnectorScheduling::Immediate; } } @@ -108,7 +114,8 @@ impl ConnectorApplication { while let Some(message) = comp_ctx.read_next_message() { match message { Message::Data(message) => self.handle_new_data_message(message, comp_ctx), - Message::Sync(message) => self.handle_new_sync_message(message, comp_ctx), + Message::SyncComp(message) => self.handle_new_sync_comp_message(message, comp_ctx), + Message::SyncPort(message) => self.handle_new_sync_port_message(message, comp_ctx), Message::Control(_) => unreachable!("control message in native API component"), } } @@ -138,19 +145,23 @@ impl ConnectorApplication { let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.consensus.notify_of_received_message(receiving_branch_id, &message, ctx); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); } } - pub(crate) fn handle_new_sync_message(&mut self, message: SyncMessage, ctx: &mut ComponentCtx) { - if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, ctx); + pub(crate) fn handle_new_sync_comp_message(&mut self, message: SyncCompMessage, ctx: &mut ComponentCtx) { + if let Some(conclusion) = self.consensus.handle_new_sync_comp_message(message, ctx) { + self.collapse_sync_to_conclusion(conclusion, ctx); } } + pub(crate) fn handle_new_sync_port_message(&mut self, message: SyncPortMessage, ctx: &mut ComponentCtx) { + self.consensus.handle_new_sync_port_message(message, ctx); + } + fn run_in_sync_mode(&mut self, _sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { debug_assert!(self.is_in_sync); @@ -214,7 +225,7 @@ impl ConnectorApplication { branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message); + self.consensus.notify_of_received_message(receiving_branch_id, &message, comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; @@ -287,16 +298,21 @@ impl ConnectorApplication { return ConnectorScheduling::NotNow; } - fn collapse_sync_to_solution_branch(&mut self, branch_id: BranchId, comp_ctx: &mut ComponentCtx) { - debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program + fn collapse_sync_to_conclusion(&mut self, conclusion: RoundConclusion, comp_ctx: &mut ComponentCtx) { // Notifying tree, consensus algorithm and context of ending sync let mut fake_vec = Vec::new(); + + let (branch_id, success) = match conclusion { + RoundConclusion::Success(branch_id) => { + debug_assert!(self.branch_extra[branch_id.index as usize] >= self.sync_desc.len()); // finished program provided by API + (branch_id, true) + }, + RoundConclusion::Failure => (BranchId::new_invalid(), false), + }; + let mut solution_branch = self.tree.end_sync(branch_id); self.consensus.end_sync(branch_id, &mut fake_vec); - - for port in fake_vec { - debug_assert!(comp_ctx.get_port_by_id(port).is_some()); - } + debug_assert!(fake_vec.is_empty()); comp_ctx.notify_sync_end(&[]); @@ -320,7 +336,7 @@ impl ConnectorApplication { let (results, notification) = &*self.sync_done; let mut results = results.lock().unwrap(); - *results = Some(FinishedSync{ inbox }); + *results = Some(FinishedSync{ success, inbox }); notification.notify_one(); } } @@ -345,6 +361,7 @@ pub enum ApplicationStartSyncError { #[derive(Debug)] pub enum ApplicationEndSyncError { NotInSync, + Failure, } pub enum ApplicationSyncAction { @@ -496,7 +513,12 @@ impl ApplicationInterface { lock = condition.wait_while(lock, |v| v.is_none()).unwrap(); // wait while not done self.is_in_sync = false; - return Ok(lock.take().unwrap().inbox); + let result = lock.take().unwrap(); + if result.success { + return Ok(result.inbox); + } else { + return Err(ApplicationEndSyncError::Failure); + } } /// Called by runtime to set associated connector's ID.