diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index 049eaa42781c38608098e5b5a1a454456a5f142f..265ef4f799deac03885e89be24fe072935123a7b 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -38,8 +38,7 @@ use crate::runtime2::port::PortKind; use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency}; -use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy}; -use super::inbox::PublicInbox; +use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox}; use super::native::Connector; use super::port::PortIdLocal; use super::scheduler::{ComponentCtxFancy, SchedulerCtx}; @@ -79,7 +78,7 @@ struct ConnectorRunContext<'a> { prepared_channel: Option<(Value, Value)>, } -impl RunContext for ConnectorRunContext{ +impl<'a> RunContext for ConnectorRunContext<'a>{ fn did_put(&mut self, port: PortId) -> bool { let port_id = PortIdLocal::new(port.0.u32_suffix); let annotation = self.consensus.get_annotation(self.branch_id, port_id); @@ -110,7 +109,10 @@ impl Connector for ConnectorPDL { self.handle_new_messages(comp_ctx); if self.tree.is_in_sync() { let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - self.consensus.handle_new_finished_sync_branches(); + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { + todo!("call handler"); + } + return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); @@ -132,8 +134,8 @@ impl ConnectorPDL { pub fn handle_new_messages(&mut self, ctx: &mut ComponentCtxFancy) { while let Some(message) = ctx.read_next_message() { match message { - MessageFancy::Data(message) => handle_new_data_message(message, ctx), - MessageFancy::Sync(message) => handle_new_sync_message(message, ctx), + MessageFancy::Data(message) => self.handle_new_data_message(message, ctx), + MessageFancy::Sync(message) => self.handle_new_sync_message(message, ctx), MessageFancy::Control(_) => unreachable!("control message in component"), } } @@ -143,8 +145,7 @@ impl ConnectorPDL { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. debug_assert!(ctx.workspace_branches.is_empty()); - self.consensus.handle_received_sync_header(&message.sync_header, ctx); - self.consensus.handle_received_data_header(&self.tree, &message.data_header, &mut ctx.workspace_branches); + self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut ctx.workspace_branches); for branch_id in ctx.workspace_branches.drain(..) { // This branch can receive, so fork and given it the message @@ -161,8 +162,9 @@ impl ConnectorPDL { } pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { - self.consensus.handle_received_sync_header(&message.sync_header, ctx); - self.consensus.handle_received_sync_message(message, ctx); + if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { + + } } // --- Running code @@ -303,7 +305,7 @@ impl ConnectorPDL { debug_assert!(branch.sync_state == SpeculativeState::RunningNonSync); let mut run_context = ConnectorRunContext{ - branch_id, + branch_id: branch.id, consensus: &self.consensus, received: &branch.inbox, scheduler: *sched_ctx, @@ -320,7 +322,7 @@ impl ConnectorPDL { RunResult::ComponentAtSyncStart => { let current_ports = comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); - self.consensus.start_sync(current_ports); + self.consensus.start_sync(current_ports, comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); return ConnectorScheduling::Immediate; @@ -361,4 +363,17 @@ impl ConnectorPDL { _ => unreachable!("unexpected run result '{:?}' while running in non-sync mode", run_result), } } + + pub fn collapse_sync_to_solution_branch(&mut self, solution_branch_id: BranchId, ctx: &mut ComponentCtxFancy) { + let mut fake_vec = Vec::new(); + self.tree.end_sync(solution_branch_id); + self.consensus.end_sync(solution_branch_id, &mut fake_vec); + + for port in fake_vec { + // TODO: Handle sent/received ports + debug_assert!(ctx.get_port_by_id(port).is_some()); + } + + ctx.notify_sync_end(&[]); + } } \ No newline at end of file