diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index 53ed270193deb762acee1587609efe37beb04349..3899d7d4c6bd87e943fad19518069369204f4a99 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -36,7 +36,7 @@ use crate::protocol::{RunContext, RunResult}; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState, PreparedStatement}; use super::consensus::{Consensus, Consistency, RoundConclusion, find_ports_in_value_group}; -use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, PublicInbox}; +use super::inbox::{DataMessage, Message, SyncCompMessage, SyncPortMessage, SyncControlMessage, PublicInbox}; use super::native::Connector; use super::port::{PortKind, PortIdLocal}; use super::scheduler::{ComponentCtx, SchedulerCtx}; @@ -159,8 +159,8 @@ impl Connector for ConnectorPDL { return scheduling; }, Mode::SyncError => { - todo!("write"); - return ConnectorScheduling::Exit; + let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); + return scheduling; }, Mode::Error => { // This shouldn't really be called. Because when we reach exit @@ -194,6 +194,11 @@ impl ConnectorPDL { } }, Message::SyncPort(message) => self.handle_new_sync_port_message(message, ctx), + Message::SyncControl(message) => { + if let Some(result) = self.handle_new_sync_control_message(message, ctx) { + return Some(result); + } + }, Message::Control(_) => unreachable!("control message in component"), } } @@ -244,6 +249,14 @@ impl ConnectorPDL { self.consensus.handle_new_sync_port_message(message, ctx); } + pub fn handle_new_sync_control_message(&mut self, message: SyncControlMessage, ctx: &mut ComponentCtx) -> Option { + if let Some(round_conclusion) = self.consensus.handle_new_sync_control_message(message, ctx) { + return Some(self.enter_non_sync_mode(round_conclusion, ctx)); + } + + return None; + } + // --- Running code pub fn run_in_sync_mode(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling {