diff --git a/src/runtime2/connector.rs b/src/runtime2/connector.rs index bdda7503cfdd8efaac4edfa57a0e2365ff67c1cc..28756d719a417835b7988cfe2b8e6b33cd994fa1 100644 --- a/src/runtime2/connector.rs +++ b/src/runtime2/connector.rs @@ -66,6 +66,7 @@ pub(crate) enum ConnectorScheduling { pub(crate) struct ConnectorPDL { tree: ExecTree, consensus: Consensus, + last_finished_handled: Option, } struct ConnectorRunContext<'a> { @@ -106,13 +107,26 @@ impl Connector for ConnectorPDL { fn run(&mut self, sched_ctx: SchedulerCtx, comp_ctx: &mut ComponentCtx) -> ConnectorScheduling { self.handle_new_messages(comp_ctx); if self.tree.is_in_sync() { + // Run in sync mode let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); - if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { - self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); - return ConnectorScheduling::Immediate; - } else { - return scheduling + + // Handle any new finished branches + let mut iter_id = self.last_finished_handled.or(self.tree.get_queue_first(QueueKind::FinishedSync)); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + self.last_finished_handled = Some(branch_id); + + + if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branch(branch_id, comp_ctx) { + // Actually found a solution + self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } + + self.last_finished_handled = Some(branch_id); } + + return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; @@ -125,6 +139,7 @@ impl ConnectorPDL { Self{ tree: ExecTree::new(initial), consensus: Consensus::new(), + last_finished_handled: None, } } @@ -143,21 +158,26 @@ impl ConnectorPDL { pub fn handle_new_data_message(&mut self, message: DataMessage, ctx: &mut ComponentCtx) { // Go through all branches that are awaiting new messages and see if // there is one that can receive this message. - debug_assert!(ctx.workspace_branches.is_empty()); - let mut branches = Vec::new(); // TODO: @Remove - if !self.consensus.handle_new_data_message(&self.tree, &message, ctx, &mut branches) { + if !self.consensus.handle_new_data_message(&message, ctx) { // Old message, so drop it return; } - for branch_id in branches.drain(..) { + let mut iter_id = self.tree.get_queue_first(QueueKind::AwaitingMessage); + while let Some(branch_id) = iter_id { + iter_id = self.tree.get_queue_next(branch_id); + + let branch = &self.tree[branch_id]; + if branch.awaiting_port != message.data_header.target_port { continue; } + if !self.consensus.branch_can_receive(branch_id, &message) { continue; } + // This branch can receive, so fork and given it the message let receiving_branch_id = self.tree.fork_branch(branch_id); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -241,7 +261,7 @@ impl ConnectorPDL { // a message that targets this branch, so check now. let mut any_message_received = false; for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.sync_header, &message.data_header, &message.content) { + if self.consensus.branch_can_receive(branch_id, &message) { // This branch can receive the message, so we do the // fork-and-receive dance let receiving_branch_id = self.tree.fork_branch(branch_id); @@ -250,7 +270,7 @@ impl ConnectorPDL { branch.insert_message(port_id, message.content.as_message().unwrap().clone()); self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); - self.consensus.notify_of_received_message(receiving_branch_id, &message.sync_header, &message.data_header, &message.content); + self.consensus.notify_of_received_message(receiving_branch_id, &message); self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_message_received = true; @@ -327,6 +347,7 @@ impl ConnectorPDL { RunResult::ComponentAtSyncStart => { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); + debug_assert!(self.last_finished_handled.is_none()); self.consensus.start_sync(comp_ctx); self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); @@ -381,5 +402,6 @@ impl ConnectorPDL { } ctx.notify_sync_end(&[]); + self.last_finished_handled = None; } } \ No newline at end of file