diff --git a/src/runtime2/connector2.rs b/src/runtime2/connector2.rs index b716d3fa20090f606e03cd887197bcb116e49753..f33e6c5c2bb5bdbcfa4949ebb9fe7ee0c7de128c 100644 --- a/src/runtime2/connector2.rs +++ b/src/runtime2/connector2.rs @@ -34,6 +34,7 @@ use crate::common::ComponentState; use crate::protocol::eval::{Prompt, Value, ValueGroup}; use crate::protocol::{RunContext, RunResult}; use crate::runtime2::consensus::find_ports_in_value_group; +use crate::runtime2::inbox2::DataContent; use crate::runtime2::port::PortKind; use super::branch::{BranchId, ExecTree, QueueKind, SpeculativeState}; @@ -110,10 +111,11 @@ impl Connector for ConnectorPDL { if self.tree.is_in_sync() { let scheduling = self.run_in_sync_mode(sched_ctx, comp_ctx); if let Some(solution_branch_id) = self.consensus.handle_new_finished_sync_branches(&self.tree, comp_ctx) { - todo!("call handler"); + self.collapse_sync_to_solution_branch(solution_branch_id, comp_ctx); + return ConnectorScheduling::Immediate; + } else { + return scheduling } - - return scheduling; } else { let scheduling = self.run_in_deterministic_mode(sched_ctx, comp_ctx); return scheduling; @@ -154,8 +156,8 @@ impl ConnectorPDL { self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); let receiving_branch = &mut self.tree[receiving_branch_id]; - receiving_branch.insert_message(message.data_header.target_port, message.content.clone()); - self.consensus.notify_of_received_message(branch_id, &message.data_header, &message.content); + receiving_branch.insert_message(message.data_header.target_port, message.content.as_message().unwrap().clone()); + self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); // And prepare the branch for running self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); @@ -164,7 +166,7 @@ impl ConnectorPDL { pub fn handle_new_sync_message(&mut self, message: SyncMessageFancy, ctx: &mut ComponentCtxFancy) { if let Some(solution_branch_id) = self.consensus.handle_new_sync_message(message, ctx) { - + self.collapse_sync_to_solution_branch(solution_branch_id, ctx); } } @@ -239,16 +241,17 @@ impl ConnectorPDL { // a message that targets this branch, so check now. let mut any_branch_received = false; for message in comp_ctx.get_read_data_messages(port_id) { - if self.consensus.branch_can_receive(branch_id, &message.data_header) { + if self.consensus.branch_can_receive(branch_id, &message.data_header, &message.content) { // This branch can receive the message, so we do the // fork-and-receive dance - let recv_branch_id = self.tree.fork_branch(branch_id); - let branch = &mut self.tree[recv_branch_id]; - branch.insert_message(port_id, message.content.clone()); + let receiving_branch_id = self.tree.fork_branch(branch_id); + let branch = &mut self.tree[receiving_branch_id]; + + branch.insert_message(port_id, message.content.as_message().unwrap().clone()); - self.consensus.notify_of_new_branch(branch_id, recv_branch_id); - self.consensus.notify_of_received_message(recv_branch_id, &message.data_header, &message.content); - self.tree.push_into_queue(QueueKind::Runnable, recv_branch_id); + self.consensus.notify_of_new_branch(branch_id, receiving_branch_id); + self.consensus.notify_of_received_message(receiving_branch_id, &message.data_header, &message.content); + self.tree.push_into_queue(QueueKind::Runnable, receiving_branch_id); any_branch_received = true; } @@ -267,7 +270,7 @@ impl ConnectorPDL { branch.sync_state = SpeculativeState::ReachedSyncEnd; self.tree.push_into_queue(QueueKind::FinishedSync, branch_id); } else if consistency == Consistency::Inconsistent { - branch.sync_state == SpeculativeState::Inconsistent; + branch.sync_state = SpeculativeState::Inconsistent; } }, RunResult::BranchPut(port_id, content) => { @@ -278,7 +281,8 @@ impl ConnectorPDL { // `put()` is valid. let (sync_header, data_header) = self.consensus.handle_message_to_send(branch_id, port_id, &content, comp_ctx); comp_ctx.submit_message(MessageFancy::Data(DataMessageFancy{ - sync_header, data_header, content + sync_header, data_header, + content: DataContent::Message(content), })); self.tree.push_into_queue(QueueKind::Runnable, branch_id); @@ -324,6 +328,7 @@ impl ConnectorPDL { comp_ctx.notify_sync_start(); let sync_branch_id = self.tree.start_sync(); self.consensus.start_sync(comp_ctx); + self.consensus.notify_of_new_branch(BranchId::new_invalid(), sync_branch_id); self.tree.push_into_queue(QueueKind::Runnable, sync_branch_id); return ConnectorScheduling::Immediate;