diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index 265ef4f799deac03885e89be24fe072935123a7b..b716d3fa20090f606e03cd887197bcb116e49753 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -36,7 +36,7 @@ use crate::protocol::{RunContext, RunResult}; use crate::runtime2::consensus::find_ports_in_value_group; use crate::runtime2::port::PortKind; -use super::branch::{Branch, BranchId, ExecTree, QueueKind, SpeculativeState}; +use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; use super::consensus::{Consensus, Consistency}; use super::inbox2::{DataMessageFancy, MessageFancy, SyncMessageFancy, PublicInbox}; use super::native::Connector; @@ -145,9 +145,10 @@ impl ConnectorPDL { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. debug_assert!(ctx.workspace_branches.is_empty()); - self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut ctx.workspace_branches); + let mut branches = Vec::new(); // TODO: @Remove + self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches); - for branch_id in ctx.workspace_branches.drain(..) { + for branch_id in branches.drain(..) { // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); @@ -185,7 +186,7 @@ impl ConnectorPDL { branch_id, consensus: &self.consensus, received: &branch.inbox, - scheduler: *sched_ctx, + scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -308,7 +309,7 @@ impl ConnectorPDL { branch_id: branch.id, consensus: &self.consensus, received: &branch.inbox, - scheduler: *sched_ctx, + scheduler: sched_ctx, prepared_channel: branch.prepared_channel.take(), }; let run_result = branch.code_state.run(&mut run_context, &sched_ctx.runtime.protocol_description); @@ -320,9 +321,9 @@ impl ConnectorPDL { return ConnectorScheduling::Exit; }, RunResult::ComponentAtSyncStart => { - let current_ports = comp_ctx.notify_sync_start(); + comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); - self.consensus.start_sync(current_ports, comp_ctx); + self.consensus.start_sync(comp_ctx); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); return ConnectorScheduling::Immediate;